diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index fa45883d..c74fa83e 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -222,15 +222,6 @@ class ProcessingQueue(Queue.Queue, object): """ Call only when a section has been completely exhausted """ - # Might have some items left if we lowered section.number_of_items - leftover = self._current_queue._qsize() - if leftover: - LOG.warn('Still have %s items in the current queue', leftover) - self.unfinished_tasks -= leftover - if self.unfinished_tasks == 0: - self.all_tasks_done.notify_all() - elif self.unfinished_tasks < 0: - raise RuntimeError('Got negative number of unfinished_tasks') self._sections.popleft() self._queues.popleft() self._activate_next_section() diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py index 9dedcf32..37e8eef0 100644 --- a/resources/lib/library_sync/fill_metadata_queue.py +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -3,7 +3,7 @@ from __future__ import absolute_import, division, unicode_literals from logging import getLogger from Queue import Empty -from . import common +from . import common, sections from ..plex_db import PlexDB from .. import backgroundthread @@ -19,10 +19,12 @@ class FillMetadataQueue(common.LibrarySyncMixin, queue. Will use a COPIED plex.db file (plex-copy.db) in order to read much faster without the writing thread stalling """ - def __init__(self, repair, section_queue, get_metadata_queue): + def __init__(self, repair, section_queue, get_metadata_queue, + processing_queue): self.repair = repair self.section_queue = section_queue self.get_metadata_queue = get_metadata_queue + self.processing_queue = processing_queue super(FillMetadataQueue, self).__init__() def _process_section(self, section): @@ -31,6 +33,7 @@ class FillMetadataQueue(common.LibrarySyncMixin, LOG.debug('Process section %s with %s items', section, section.number_of_items) count = 0 + do_process_section = False with PlexDB(lock=False, copy=True) as plexdb: for xml in section.iterator: if self.should_cancel(): @@ -52,10 +55,14 @@ class FillMetadataQueue(common.LibrarySyncMixin, section.sync_successful = False break count += 1 + if not do_process_section: + do_process_section = True + self.processing_queue.add_section(section) + LOG.debug('Put section in queue with %s items: %s', + section.number_of_items, section) # We might have received LESS items from the PMS than anticipated. # Ensures that our queues finish - LOG.debug('Expected to process %s items, actually processed %s for ' - 'section %s', section.number_of_items, count, section) + LOG.debug('%s items to process for section %s', count, section) section.number_of_items = count def _run(self): @@ -67,3 +74,5 @@ class FillMetadataQueue(common.LibrarySyncMixin, self._process_section(section) # Signal the download metadata threads to stop with a sentinel self.get_metadata_queue.put(None) + # Sentinel for the process_thread once we added everything else + self.processing_queue.add_sentinel(sections.Section()) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 5a9ac400..cdb85e30 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -83,7 +83,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) scanner_thread = FillMetadataQueue(self.repair, section_queue, - get_metadata_queue) + get_metadata_queue, + processing_queue) scanner_thread.start() metadata_threads = [ GetMetadataThread(get_metadata_queue, processing_queue) @@ -136,8 +137,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): LOG.error('Could not entirely process section %s', section) self.successful = False - def threaded_get_generators(self, kinds, section_queue, processing_queue, - all_items): + def threaded_get_generators(self, kinds, section_queue, all_items): """ Getting iterators is costly, so let's do it in a dedicated thread """ @@ -171,7 +171,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): else: section.number_of_items = section.iterator.total if section.number_of_items > 0: - processing_queue.add_section(section) section_queue.put(section) LOG.debug('Put section in queue with %s items: %s', section.number_of_items, section) @@ -180,8 +179,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): finally: # Sentinel for the section queue section_queue.put(None) - # Sentinel for the process_thread once we added everything else - processing_queue.add_sentinel(sections.Section()) LOG.debug('Exiting threaded_get_generators') def full_library_sync(self): @@ -202,7 +199,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): # We need to enforce syncing e.g. show before season before episode bg.FunctionAsTask(self.threaded_get_generators, None, - kinds, section_queue, processing_queue, False).start() + kinds, section_queue, False).start() # Do the heavy lifting self.process_new_and_changed_items(section_queue, processing_queue) common.update_kodi_library(video=True, music=True) @@ -236,7 +233,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): self.dialog = None bg.FunctionAsTask(self.threaded_get_generators, None, - kinds, section_queue, processing_queue, True).start() + kinds, section_queue, True).start() self.processing_loop_playstates(section_queue) if self.should_cancel() or not self.successful: return