diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 82039dbd..3305f188 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -35,6 +35,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): """ repair=True: force sync EVERY item """ + self.successful = True self.repair = repair self.callback = callback # For progress dialog @@ -45,21 +46,9 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): self.dialog.create(utils.lang(39714)) else: self.dialog = None - - self.section_queue = Queue.Queue() - self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) - self.processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE) self.current_time = timing.plex_now() self.last_section = sections.Section() - - self.successful = True self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true' - self.threads = [ - GetMetadataThread(self.get_metadata_queue, self.processing_queue) - for _ in range(int(utils.settings('syncThreadNumber'))) - ] - for t in self.threads: - t.start() super(FullSync, self).__init__() def update_progressbar(self, section, title, current): @@ -89,33 +78,42 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH) @utils.log_time - def processing_loop_new_and_changed_items(self): + def processing_loop_new_and_changed_items(self, section_queue, + processing_queue): LOG.debug('Start working') + get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) scanner_thread = FillMetadataQueue(self.repair, - self.section_queue, - self.get_metadata_queue) + section_queue, + get_metadata_queue) scanner_thread.start() + metadata_threads = [ + GetMetadataThread(get_metadata_queue, processing_queue) + for _ in range(int(utils.settings('syncThreadNumber'))) + ] + for t in metadata_threads: + t.start() process_thread = ProcessMetadataThread(self.current_time, - self.processing_queue, + processing_queue, self.update_progressbar) process_thread.start() LOG.debug('Waiting for scanner thread to finish up') scanner_thread.join() LOG.debug('Waiting for metadata download threads to finish up') - for t in self.threads: + for t in metadata_threads: t.join() LOG.debug('Download metadata threads finished') # Sentinel for the process_thread once we added everything else - self.processing_queue.put_sentinel(sections.Section()) + processing_queue.put_sentinel(sections.Section()) + LOG.debug('Put sentinel into queue, waiting for processing thread') process_thread.join() self.successful = process_thread.successful LOG.debug('threads finished work. successful: %s', self.successful) @utils.log_time - def processing_loop_playstates(self): + def processing_loop_playstates(self, section_queue): while not self.should_cancel(): - section = self.section_queue.get() - self.section_queue.task_done() + section = section_queue.get() + section_queue.task_done() if section is None: break self.playstate_per_section(section) @@ -142,7 +140,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): LOG.error('Could not entirely process section %s', section) self.successful = False - def threaded_get_generators(self, kinds, queue, all_items): + def threaded_get_generators(self, kinds, section_queue, processing_queue, + all_items): """ Getting iterators is costly, so let's do it in a dedicated thread """ @@ -176,17 +175,19 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): else: section.number_of_items = section.iterator.total if section.number_of_items > 0: - self.processing_queue.add_section(section) - queue.put(section) + processing_queue.add_section(section) + section_queue.put(section) LOG.debug('Put section in queue with %s items: %s', section.number_of_items, section) except Exception: utils.ERROR(notify=True) finally: - queue.put(None) + section_queue.put(None) LOG.debug('Exiting threaded_get_generators') def full_library_sync(self): + section_queue = Queue.Queue() + processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE) kinds = [ (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE), (v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW), @@ -202,9 +203,10 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): # We need to enforce syncing e.g. show before season before episode bg.FunctionAsTask(self.threaded_get_generators, None, - kinds, self.section_queue, False).start() + kinds, section_queue, processing_queue, False).start() # Do the heavy lifting - self.processing_loop_new_and_changed_items() + self.processing_loop_new_and_changed_items(section_queue, + processing_queue) common.update_kodi_library(video=True, music=True) if self.should_cancel() or not self.successful: return @@ -236,8 +238,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): self.dialog = None bg.FunctionAsTask(self.threaded_get_generators, None, - kinds, self.section_queue, True).start() - self.processing_loop_playstates() + kinds, section_queue, processing_queue, True).start() + self.processing_loop_playstates(section_queue) if self.should_cancel() or not self.successful: return