From d116bbdfe9c61939672e7fa2de0221894cf2fc71 Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 14 Feb 2020 13:40:46 +0100 Subject: [PATCH 1/7] Rename method --- resources/lib/backgroundthread.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 835ea16f..f10c491b 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -230,7 +230,7 @@ class ProcessingQueue(Queue.Queue, object): self.unfinished_tasks += 1 if len(self._queues) == 1: # queue was already exhausted! - self._switch_queues() + self._activate_next_section() self._counter = 0 self.not_empty.notify() else: @@ -255,15 +255,15 @@ class ProcessingQueue(Queue.Queue, object): OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM else Queue.Queue()) if self._current_section is None: - self._switch_queues() + self._activate_next_section() def _init_next_section(self): self._sections.popleft() self._queues.popleft() self._counter = 0 - self._switch_queues() + self._activate_next_section() - def _switch_queues(self): + def _activate_next_section(self): self._current_section = self._sections[0] if self._sections else None self._current_queue = self._queues[0] if self._queues else None From b69070275fd93dbf5588c344a28515d861bd5e50 Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 14 Feb 2020 13:41:28 +0100 Subject: [PATCH 2/7] Make sure OrdererQueue returns the correct queue size --- resources/lib/backgroundthread.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index f10c491b..1a5f5ca2 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -136,14 +136,8 @@ class ProcessingQueue(Queue.Queue, object): def _qsize(self): return self._current_queue._qsize() if self._current_queue else 0 - def total_size(self): - """ - Return the approximate total size of all queues (not reliable!) - """ - self.mutex.acquire() - n = sum(q._qsize() for q in self._queues) if self._queues else 0 - self.mutex.release() - return n + def _total_qsize(self): + return sum(q._qsize() for q in self._queues) if self._queues else 0 def put(self, item, block=True, timeout=None): """Put an item into the queue. @@ -160,17 +154,16 @@ class ProcessingQueue(Queue.Queue, object): try: if self.maxsize > 0: if not block: - # Use >= instead of == due to OrderedQueue! - if self._qsize() >= self.maxsize: + if self._total_qsize() == self.maxsize: raise Queue.Full elif timeout is None: - while self._qsize() >= self.maxsize: + while self._total_qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout - while self._qsize() >= self.maxsize: + while self._total_qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: raise Queue.Full From ddd356deda99b7c27b560c7b8b446a2ec3cc9868 Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 14 Feb 2020 15:09:53 +0100 Subject: [PATCH 3/7] Refactor code --- resources/lib/library_sync/full_sync.py | 60 +++++++++++++------------ 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 82039dbd..3305f188 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -35,6 +35,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): """ repair=True: force sync EVERY item """ + self.successful = True self.repair = repair self.callback = callback # For progress dialog @@ -45,21 +46,9 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): self.dialog.create(utils.lang(39714)) else: self.dialog = None - - self.section_queue = Queue.Queue() - self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) - self.processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE) self.current_time = timing.plex_now() self.last_section = sections.Section() - - self.successful = True self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true' - self.threads = [ - GetMetadataThread(self.get_metadata_queue, self.processing_queue) - for _ in range(int(utils.settings('syncThreadNumber'))) - ] - for t in self.threads: - t.start() super(FullSync, self).__init__() def update_progressbar(self, section, title, current): @@ -89,33 +78,42 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH) @utils.log_time - def processing_loop_new_and_changed_items(self): + def processing_loop_new_and_changed_items(self, section_queue, + processing_queue): LOG.debug('Start working') + get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) scanner_thread = FillMetadataQueue(self.repair, - self.section_queue, - self.get_metadata_queue) + section_queue, + get_metadata_queue) scanner_thread.start() + metadata_threads = [ + GetMetadataThread(get_metadata_queue, processing_queue) + for _ in range(int(utils.settings('syncThreadNumber'))) + ] + for t in metadata_threads: + t.start() process_thread = ProcessMetadataThread(self.current_time, - self.processing_queue, + processing_queue, self.update_progressbar) process_thread.start() LOG.debug('Waiting for scanner thread to finish up') scanner_thread.join() LOG.debug('Waiting for metadata download threads to finish up') - for t in self.threads: + for t in metadata_threads: t.join() LOG.debug('Download metadata threads finished') # Sentinel for the process_thread once we added everything else - self.processing_queue.put_sentinel(sections.Section()) + processing_queue.put_sentinel(sections.Section()) + LOG.debug('Put sentinel into queue, waiting for processing thread') process_thread.join() self.successful = process_thread.successful LOG.debug('threads finished work. successful: %s', self.successful) @utils.log_time - def processing_loop_playstates(self): + def processing_loop_playstates(self, section_queue): while not self.should_cancel(): - section = self.section_queue.get() - self.section_queue.task_done() + section = section_queue.get() + section_queue.task_done() if section is None: break self.playstate_per_section(section) @@ -142,7 +140,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): LOG.error('Could not entirely process section %s', section) self.successful = False - def threaded_get_generators(self, kinds, queue, all_items): + def threaded_get_generators(self, kinds, section_queue, processing_queue, + all_items): """ Getting iterators is costly, so let's do it in a dedicated thread """ @@ -176,17 +175,19 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): else: section.number_of_items = section.iterator.total if section.number_of_items > 0: - self.processing_queue.add_section(section) - queue.put(section) + processing_queue.add_section(section) + section_queue.put(section) LOG.debug('Put section in queue with %s items: %s', section.number_of_items, section) except Exception: utils.ERROR(notify=True) finally: - queue.put(None) + section_queue.put(None) LOG.debug('Exiting threaded_get_generators') def full_library_sync(self): + section_queue = Queue.Queue() + processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE) kinds = [ (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE), (v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW), @@ -202,9 +203,10 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): # We need to enforce syncing e.g. show before season before episode bg.FunctionAsTask(self.threaded_get_generators, None, - kinds, self.section_queue, False).start() + kinds, section_queue, processing_queue, False).start() # Do the heavy lifting - self.processing_loop_new_and_changed_items() + self.processing_loop_new_and_changed_items(section_queue, + processing_queue) common.update_kodi_library(video=True, music=True) if self.should_cancel() or not self.successful: return @@ -236,8 +238,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): self.dialog = None bg.FunctionAsTask(self.threaded_get_generators, None, - kinds, self.section_queue, True).start() - self.processing_loop_playstates() + kinds, section_queue, processing_queue, True).start() + self.processing_loop_playstates(section_queue) if self.should_cancel() or not self.successful: return From 9a0ce533ee83623868d69f973de6fd567f55f3f0 Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 14 Feb 2020 15:11:20 +0100 Subject: [PATCH 4/7] Rename method --- resources/lib/library_sync/full_sync.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 3305f188..b2e7e39c 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -78,8 +78,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH) @utils.log_time - def processing_loop_new_and_changed_items(self, section_queue, - processing_queue): + def process_new_and_changed_items(self, section_queue, processing_queue): LOG.debug('Start working') get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) scanner_thread = FillMetadataQueue(self.repair, @@ -205,8 +204,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): None, kinds, section_queue, processing_queue, False).start() # Do the heavy lifting - self.processing_loop_new_and_changed_items(section_queue, - processing_queue) + self.process_new_and_changed_items(section_queue, processing_queue) common.update_kodi_library(video=True, music=True) if self.should_cancel() or not self.successful: return From a4a0b075bf9d1046a942621673b70327288297aa Mon Sep 17 00:00:00 2001 From: croneter Date: Sat, 15 Feb 2020 17:11:31 +0100 Subject: [PATCH 5/7] Increase logging for the number of items we actually process --- resources/lib/library_sync/fill_metadata_queue.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py index ca6c2d4e..9dedcf32 100644 --- a/resources/lib/library_sync/fill_metadata_queue.py +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -54,6 +54,8 @@ class FillMetadataQueue(common.LibrarySyncMixin, count += 1 # We might have received LESS items from the PMS than anticipated. # Ensures that our queues finish + LOG.debug('Expected to process %s items, actually processed %s for ' + 'section %s', section.number_of_items, count, section) section.number_of_items = count def _run(self): From 73ffb706f8ac8e637f1ab8a1cfcd045c3c6342c6 Mon Sep 17 00:00:00 2001 From: croneter Date: Sat, 15 Feb 2020 17:12:46 +0100 Subject: [PATCH 6/7] Make sure we're receiving valid item from the processing queue in case we should be aborting sync --- resources/lib/library_sync/process_metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resources/lib/library_sync/process_metadata.py b/resources/lib/library_sync/process_metadata.py index cddcd20f..0d573965 100644 --- a/resources/lib/library_sync/process_metadata.py +++ b/resources/lib/library_sync/process_metadata.py @@ -52,7 +52,7 @@ class ProcessMetadataThread(common.LibrarySyncMixin, def _get(self): item = {'xml': None} - while not self.should_cancel() and item and item['xml'] is None: + while item and item['xml'] is None: item = self.processing_queue.get() self.processing_queue.task_done() return item From 51d1538f95d616bc5c999413f810a4bcda849acd Mon Sep 17 00:00:00 2001 From: croneter Date: Sat, 15 Feb 2020 17:46:48 +0100 Subject: [PATCH 7/7] Rewire the ProcessingQueue to ensure that we can exhaust it completely and don't get stuck --- resources/lib/backgroundthread.py | 82 ++++++++++--------------- resources/lib/library_sync/full_sync.py | 6 +- 2 files changed, 34 insertions(+), 54 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 1a5f5ca2..fa45883d 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -123,7 +123,7 @@ class ProcessingQueue(Queue.Queue, object): Put tuples (count, item) into this queue, with count being the respective position of the item in the queue, starting with 0 (zero). (None, None) is the sentinel for a single queue being exhausted, added by - put_sentinel() + add_sentinel() """ def _init(self, maxsize): self.queue = deque() @@ -131,6 +131,7 @@ class ProcessingQueue(Queue.Queue, object): self._queues = deque() self._current_section = None self._current_queue = None + # Item-index for the currently active queue self._counter = 0 def _qsize(self): @@ -140,15 +141,9 @@ class ProcessingQueue(Queue.Queue, object): return sum(q._qsize() for q in self._queues) if self._queues else 0 def put(self, item, block=True, timeout=None): - """Put an item into the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until a free slot is available. If 'timeout' is - a non-negative number, it blocks at most 'timeout' seconds and raises - the Full exception if no free slot was available within that time. - Otherwise ('block' is false), put an item on the queue if a free slot - is immediately available, else raise the Full exception ('timeout' - is ignored in that case). + """ + PKC customization of Queue.put. item needs to be the tuple + (count [int], {'section': [Section], 'xml': [etree xml]}) """ self.not_full.acquire() try: @@ -168,73 +163,46 @@ class ProcessingQueue(Queue.Queue, object): if remaining <= 0.0: raise Queue.Full self.not_full.wait(remaining) - if self._put(item) == 0: - # Only notify one waiting thread if this item is put into the - # current queue - self.not_empty.notify() - else: - # Be sure to signal not_empty only once! - self._unlock_after_section_change() + self._put(item) self.unfinished_tasks += 1 + self.not_empty.notify() finally: self.not_full.release() def _put(self, item): - """ - Returns the index of the section in whose subqueue we need to put the - item into - """ for i, section in enumerate(self._sections): if item[1]['section'] == section: self._queues[i]._put(item) break else: raise RuntimeError('Could not find section for item %s' % item[1]) - return i - def _unlock_after_section_change(self): - """ - Ugly work-around if we expected more items to be synced, but we had - to lower our section.number_of_items because PKC decided that nothing - changed and we don't need to sync the respective item(s). - get() thus might block indefinitely - """ - while (self._current_section and - self._counter == self._current_section.number_of_items): - LOG.debug('Signaling completion of current section') - self._init_next_section() - if self._current_queue and self._current_queue._qsize(): - LOG.debug('Signaling not_empty') - self.not_empty.notify() - - def put_sentinel(self, section): + def add_sentinel(self, section): """ Adds a new empty section as a sentinel. Call with an empty Section() - object. + object. Call this method immediately after having added all sections + with add_section(). Once the get()-method returns None, you've received the sentinel and you've thus exhausted the queue """ - self.not_empty.acquire() + self.not_full.acquire() try: section.number_of_items = 1 self._add_section(section) # Add the actual sentinel to the queue we just added self._queues[-1]._put((None, None)) self.unfinished_tasks += 1 - if len(self._queues) == 1: - # queue was already exhausted! - self._activate_next_section() - self._counter = 0 - self.not_empty.notify() - else: - self._unlock_after_section_change() + self.not_empty.notify() finally: - self.not_empty.release() + self.not_full.release() def add_section(self, section): """ - Be sure to add all sections first before starting to pop items off this - queue or adding them to the queue + Add a new Section() to this Queue. Each section will be entirely + processed before moving on to the next section. + + Be sure to set section.number_of_items correctly as it will signal + when processing is completely done for a specific section! """ self.mutex.acquire() try: @@ -251,12 +219,24 @@ class ProcessingQueue(Queue.Queue, object): self._activate_next_section() def _init_next_section(self): + """ + Call only when a section has been completely exhausted + """ + # Might have some items left if we lowered section.number_of_items + leftover = self._current_queue._qsize() + if leftover: + LOG.warn('Still have %s items in the current queue', leftover) + self.unfinished_tasks -= leftover + if self.unfinished_tasks == 0: + self.all_tasks_done.notify_all() + elif self.unfinished_tasks < 0: + raise RuntimeError('Got negative number of unfinished_tasks') self._sections.popleft() self._queues.popleft() - self._counter = 0 self._activate_next_section() def _activate_next_section(self): + self._counter = 0 self._current_section = self._sections[0] if self._sections else None self._current_queue = self._queues[0] if self._queues else None diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index b2e7e39c..5a9ac400 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -101,9 +101,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): for t in metadata_threads: t.join() LOG.debug('Download metadata threads finished') - # Sentinel for the process_thread once we added everything else - processing_queue.put_sentinel(sections.Section()) - LOG.debug('Put sentinel into queue, waiting for processing thread') process_thread.join() self.successful = process_thread.successful LOG.debug('threads finished work. successful: %s', self.successful) @@ -181,7 +178,10 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): except Exception: utils.ERROR(notify=True) finally: + # Sentinel for the section queue section_queue.put(None) + # Sentinel for the process_thread once we added everything else + processing_queue.add_sentinel(sections.Section()) LOG.debug('Exiting threaded_get_generators') def full_library_sync(self):