From 75c84bb5fe83b7ada140660c2a6c50661b762b52 Mon Sep 17 00:00:00 2001 From: Luca Scarabello <luca.scarabello@sed.ethz.ch> Date: Thu, 1 Jun 2017 17:43:05 +0200 Subject: [PATCH] Fixed some bugs related to the update of catalog with new events --- mpi/template_manager.py | 4 ++-- mpi/template_matching.py | 27 +++++++++++++-------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/mpi/template_manager.py b/mpi/template_manager.py index 2a1e028..12e649a 100644 --- a/mpi/template_manager.py +++ b/mpi/template_manager.py @@ -20,11 +20,11 @@ class TemplateManager(SequentialLoader): start, shift, mag = t if abs(start - template[0]) <= 0.1: #tolerance 0.1 seconds self.logger.debug("duplicated template found (%s)" % str(template) ) - return False + return False, -1 next_id = max(self.map) + 1 if self.map else 0 self.map[next_id] = template self.add(next_id) - return True + return True, next_id def get_template(self, id): return self.map[id] diff --git a/mpi/template_matching.py b/mpi/template_matching.py index 60f6563..8e3c3fa 100644 --- a/mpi/template_matching.py +++ b/mpi/template_matching.py @@ -264,13 +264,13 @@ class TemplateMatching(object): not self.chunk_mngr.done() or \ not self.scanner_mngr.done(): + # Reclaim any slave who completed his task and check results + self.__reclaim_slaves() + # Possibly add a new template from UNDETECTED catalog events if self.first_scan: self.__detect_new_templates() - # Reclaim any slave who completed his task and check results - self.__reclaim_slaves() - # Load new templates (if any new one to load) self.__load_templates() @@ -328,7 +328,6 @@ class TemplateMatching(object): self.failed_templates.append(template) else: self.template_mngr.set_loaded(template) - self.scanner_mngr.add_template_to_process(template) self.logger.debug('(done) Loading template %d' % (template) ) # reclaim readers and mark new chunks ready to be processed @@ -341,7 +340,6 @@ class TemplateMatching(object): self.logger.warning('(Error) Reading chunk %d' % (chunk) ) else: self.chunk_mngr.set_loaded(chunk) - self.scanner_mngr.add_chunk_to_process(chunk) self.logger.debug('(done) Reading chunk %d' % (chunk) ) # reclaim processors and mark chunk/template pair as processed @@ -371,7 +369,6 @@ class TemplateMatching(object): # and it is not part of the the catalog, in this latter case # it will be ready after it has been checked for new templates if ( self.scanner_mngr.done([oldest_chunk]) and - self.template_mngr.done() and oldest_chunk not in self.catalog_chunks ): self.chunk_mngr.remove(oldest_chunk) @@ -395,13 +392,9 @@ class TemplateMatching(object): # # check the next chunk from the catalog has been xcorred with all - # available templates, i.e. the chunk has been loaded and processed - # with all loaded templates and finally all the known templates have - # been already loaded + # available templates # - if ( not self.chunk_mngr.done(chunk_id) or - not self.scanner_mngr.done([chunk_id]) or - not self.template_mngr.done() ): + if not self.scanner_mngr.done([chunk_id]): break # @@ -422,7 +415,9 @@ class TemplateMatching(object): missed_events = tmcore.missing_events(highCCC, self.catalog, starttime, endtime) for missed in missed_events: new_template = (missed[1], 0.0, missed[2]) - if self.template_mngr.add_template(new_template): + added, template_id = self.template_mngr.add_template(new_template) + if added: + self.scanner_mngr.add_template_to_process(template_id) self.logger.info('New template found: %s | %.1f | M%4.2f' % tuple(new_template) ) return @@ -501,10 +496,13 @@ class TemplateMatching(object): r = self.m_processors.move_slave(to_master=self.m_readers) if r is not None: self.logger.debug('Taking reader %d back from processors' % (r) ) + # # load chunks + # for r in self.m_readers.get_avaliable(): - # get first not fully processed chunk + # get first chunk that need processing (if we resume a previous run + # some of the chunks might have been already processed) while True: chunk = self.chunk_mngr.get_next() if chunk is None or not self.scanner_mngr.done([chunk]): @@ -519,6 +517,7 @@ class TemplateMatching(object): self.m_readers.move_slave(to_master=self.m_processors, slave=r) break + # finally load the chunk start, end, length = self.chunks.get(chunk) self.logger.debug('Reading chunk %d (start %s end %s length %s)' % (chunk, start, end, length)) self.m_readers.run(r, data=(chunk, start, length) ) -- GitLab