From e36656dc81ac95993e7ab18a0cf6c78bcffb5daf Mon Sep 17 00:00:00 2001 From: croneter Date: Sun, 18 Jul 2021 15:35:22 +0200 Subject: [PATCH 1/4] Improve logging fixup logging --- resources/lib/library_sync/fill_metadata_queue.py | 6 +++--- resources/lib/library_sync/sections.py | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py index 903301fe..57f64ff4 100644 --- a/resources/lib/library_sync/fill_metadata_queue.py +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -57,12 +57,12 @@ class FillMetadataQueue(common.LibrarySyncMixin, if not do_process_section: do_process_section = True self.processing_queue.add_section(section) - LOG.debug('Put section in queue with %s items: %s', - section.number_of_items, section) + LOG.debug('Put section in processing queue: %s', section) # We might have received LESS items from the PMS than anticipated. # Ensures that our queues finish - LOG.debug('%s items to process for section %s', count, section) section.number_of_items = count + LOG.debug('%s items to process for section %s', + section.number_of_items, section) def _run(self): while not self.should_cancel(): diff --git a/resources/lib/library_sync/sections.py b/resources/lib/library_sync/sections.py index 3a206351..affdaebb 100644 --- a/resources/lib/library_sync/sections.py +++ b/resources/lib/library_sync/sections.py @@ -92,6 +92,7 @@ class Section(object): "'name': '{self.name}', " "'section_id': {self.section_id}, " "'section_type': '{self.section_type}', " + "'plex_type': '{self.plex_type}', " "'sync_to_kodi': {self.sync_to_kodi}, " "'last_sync': {self.last_sync}" "}}").format(self=self) From c664f0571814a157111c4bb7d39f95f4411a3695 Mon Sep 17 00:00:00 2001 From: croneter Date: Sun, 18 Jul 2021 15:41:10 +0200 Subject: [PATCH 2/4] Fix a racing condition that could lead to the sync getting stuck Fixup racing --- resources/lib/library_sync/fill_metadata_queue.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py index 57f64ff4..d1273860 100644 --- a/resources/lib/library_sync/fill_metadata_queue.py +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -45,6 +45,10 @@ class FillMetadataQueue(common.LibrarySyncMixin, if (not self.repair and plexdb.checksum(plex_id, section.plex_type) == checksum): continue + if not do_process_section: + do_process_section = True + self.processing_queue.add_section(section) + LOG.debug('Put section in processing queue: %s', section) try: self.get_metadata_queue.put((count, plex_id, section), timeout=QUEUE_TIMEOUT) @@ -53,11 +57,8 @@ class FillMetadataQueue(common.LibrarySyncMixin, 'aborting sync now', plex_id) section.sync_successful = False break - count += 1 - if not do_process_section: - do_process_section = True - self.processing_queue.add_section(section) - LOG.debug('Put section in processing queue: %s', section) + else: + count += 1 # We might have received LESS items from the PMS than anticipated. # Ensures that our queues finish section.number_of_items = count From 07a69a8fa571554d7b7e1d4798f07572b7a69b4f Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 23 Jul 2021 10:29:19 +0200 Subject: [PATCH 3/4] Remove obsolete methods --- resources/lib/backgroundthread.py | 37 ------------------------------- 1 file changed, 37 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index bb6454f6..37b94f84 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -135,43 +135,6 @@ class ProcessingQueue(queue.Queue, object): def _qsize(self): return self._current_queue._qsize() if self._current_queue else 0 - def _total_qsize(self): - """ - This method is BROKEN as it can lead to a deadlock when a single item - from the current section takes longer to download then any new items - coming in - """ - return sum(q._qsize() for q in self._queues) if self._queues else 0 - - def put(self, item, block=True, timeout=None): - """ - PKC customization of Queue.put. item needs to be the tuple - (count [int], {'section': [Section], 'xml': [etree xml]}) - """ - self.not_full.acquire() - try: - if self.maxsize > 0: - if not block: - if self._qsize() == self.maxsize: - raise queue.Full - elif timeout is None: - while self._qsize() == self.maxsize: - self.not_full.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - endtime = _time() + timeout - while self._qsize() == self.maxsize: - remaining = endtime - _time() - if remaining <= 0.0: - raise queue.Full - self.not_full.wait(remaining) - self._put(item) - self.unfinished_tasks += 1 - self.not_empty.notify() - finally: - self.not_full.release() - def _put(self, item): for i, section in enumerate(self._sections): if item[1]['section'] == section: From 143c6271aa0179e28b5d433b9f4c75a638bf00cd Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 23 Jul 2021 10:29:34 +0200 Subject: [PATCH 4/4] Switch to context manager --- resources/lib/backgroundthread.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 37b94f84..6bf119ff 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -151,16 +151,13 @@ class ProcessingQueue(queue.Queue, object): Once the get()-method returns None, you've received the sentinel and you've thus exhausted the queue """ - self.not_full.acquire() - try: + with self.not_full: section.number_of_items = 1 self._add_section(section) # Add the actual sentinel to the queue we just added self._queues[-1]._put((None, None)) self.unfinished_tasks += 1 self.not_empty.notify() - finally: - self.not_full.release() def add_section(self, section): """ @@ -170,11 +167,8 @@ class ProcessingQueue(queue.Queue, object): Be sure to set section.number_of_items correctly as it will signal when processing is completely done for a specific section! """ - self.mutex.acquire() - try: + with self.mutex: self._add_section(section) - finally: - self.mutex.release() def _add_section(self, section): self._sections.append(section)