From 06f7d88d22d0b83170bca8849602ac48355c3bc4 Mon Sep 17 00:00:00 2001 From: croneter Date: Thu, 28 Jan 2021 10:02:55 +0100 Subject: [PATCH 1/3] Add a ton of debug logging for library sync to find deadlock --- resources/lib/backgroundthread.py | 16 ++++++++++++++++ resources/lib/library_sync/process_metadata.py | 14 ++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 7af48b7d..204b882c 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -143,6 +143,7 @@ class ProcessingQueue(queue.Queue, object): PKC customization of Queue.put. item needs to be the tuple (count [int], {'section': [Section], 'xml': [etree xml]}) """ + LOG.debug('Entering ProcessingQueue.put with %s', item) self.not_full.acquire() try: if self.maxsize > 0: @@ -151,6 +152,7 @@ class ProcessingQueue(queue.Queue, object): raise queue.Full elif timeout is None: while self._total_qsize() == self.maxsize: + LOG.debug('ProcessingQueue._get - waiting in put') self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -160,14 +162,17 @@ class ProcessingQueue(queue.Queue, object): remaining = endtime - _time() if remaining <= 0.0: raise queue.Full + LOG.debug('ProcessingQueue._get - waiting 2 in put') self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release() + LOG.debug('Exiting ProcessingQueue.put with %s', item) def _put(self, item): + LOG.debug('Entering ProcessingQueue._put with %s', item) for i, section in enumerate(self._sections): if item[1]['section'] == section: self._queues[i]._put(item) @@ -183,6 +188,7 @@ class ProcessingQueue(queue.Queue, object): Once the get()-method returns None, you've received the sentinel and you've thus exhausted the queue """ + LOG.debug('Entering ProcessingQueue.add_sentinel with %s', section) self.not_full.acquire() try: section.number_of_items = 1 @@ -193,6 +199,7 @@ class ProcessingQueue(queue.Queue, object): self.not_empty.notify() finally: self.not_full.release() + LOG.debug('Exiting ProcessingQueue.add_sentinel with %s', section) def add_section(self, section): """ @@ -202,13 +209,16 @@ class ProcessingQueue(queue.Queue, object): Be sure to set section.number_of_items correctly as it will signal when processing is completely done for a specific section! """ + LOG.debug('Entering ProcessingQueue.add_section with %s', section) self.mutex.acquire() try: self._add_section(section) finally: self.mutex.release() + LOG.debug('Exiting ProcessingQueue.add_section with %s', section) def _add_section(self, section): + LOG.debug('Entering ProcessingQueue._add_section with %s', section) self._sections.append(section) self._queues.append( OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM @@ -220,18 +230,24 @@ class ProcessingQueue(queue.Queue, object): """ Call only when a section has been completely exhausted """ + LOG.debug('Entering ProcessingQueue._init_next_section') self._sections.popleft() self._queues.popleft() self._activate_next_section() def _activate_next_section(self): + LOG.debug('Entering ProcessingQueue._activate_next_section') self._counter = 0 self._current_section = self._sections[0] if self._sections else None self._current_queue = self._queues[0] if self._queues else None def _get(self): + LOG.debug('Entering ProcessingQueue._get') item = self._current_queue._get() + LOG.debug('ProcessingQueue._get - got item %s', item) self._counter += 1 + LOG.debug('ProcessingQueue._get - counter %s of %s', + self._counter, self._current_section.number_of_items) if self._counter == self._current_section.number_of_items: self._init_next_section() return item[1] diff --git a/resources/lib/library_sync/process_metadata.py b/resources/lib/library_sync/process_metadata.py index 0ba21f18..aec11e02 100644 --- a/resources/lib/library_sync/process_metadata.py +++ b/resources/lib/library_sync/process_metadata.py @@ -24,6 +24,7 @@ class ProcessMetadataThread(common.LibrarySyncMixin, super(ProcessMetadataThread, self).__init__() def start_section(self, section): + LOG.debug('Entering start_section') if section != self.last_section: if self.last_section: self.finish_last_section() @@ -33,8 +34,10 @@ class ProcessMetadataThread(common.LibrarySyncMixin, app.SYNC.path_verified = False else: LOG.debug('Resume processing section %s', section) + LOG.debug('Exiting start_section') def finish_last_section(self): + LOG.debug('Entering finish_last_section') if (not self.should_cancel() and self.last_section and self.last_section.sync_successful): # Check for should_cancel() because we cannot be sure that we @@ -48,12 +51,17 @@ class ProcessMetadataThread(common.LibrarySyncMixin, elif self.last_section and not self.last_section.sync_successful: LOG.warn('Sync not successful for section %s', self.last_section) self.successful = False + LOG.debug('Exiting finish_last_section') def _get(self): + LOG.debug('Entering _get') item = {'xml': None} while item and item['xml'] is None: + LOG.debug('_get: getting item') item = self.processing_queue.get() + LOG.debug('_get: gotten item') self.processing_queue.task_done() + LOG.debug('Exiting _get') return item def _run(self): @@ -65,6 +73,7 @@ class ProcessMetadataThread(common.LibrarySyncMixin, processed = 0 self.start_section(section) while not self.should_cancel(): + LOG.debug('In loop') if item is None: break elif item['section'] != section: @@ -72,12 +81,14 @@ class ProcessMetadataThread(common.LibrarySyncMixin, self.start_section(item['section']) section = item['section'] with section.context(self.current_time) as context: + LOG.debug('Processing %s', section) while not self.should_cancel(): if item is None or item['section'] != section: break self.update_progressbar(section, item['xml'][0].get('title'), section.count) + LOG.debug('Start add_update') context.add_update(item['xml'][0], section_name=section.name, section_id=section.section_id, @@ -86,6 +97,9 @@ class ProcessMetadataThread(common.LibrarySyncMixin, section.count += 1 if processed == COMMIT_TO_DB_EVERY_X_ITEMS: processed = 0 + LOG.debug('Commiting now') context.commit() item = self._get() + LOG.debug('Gotten a new item') + LOG.debug('start finish_last_section') self.finish_last_section() From 82e38366f5e5c46bfc1e37ee4e02196936f4b32e Mon Sep 17 00:00:00 2001 From: croneter Date: Sun, 31 Jan 2021 17:38:37 +0100 Subject: [PATCH 2/3] Revert "Add a ton of debug logging for library sync to find deadlock" This reverts commit 06f7d88d22d0b83170bca8849602ac48355c3bc4. --- resources/lib/backgroundthread.py | 16 ---------------- resources/lib/library_sync/process_metadata.py | 14 -------------- 2 files changed, 30 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 204b882c..7af48b7d 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -143,7 +143,6 @@ class ProcessingQueue(queue.Queue, object): PKC customization of Queue.put. item needs to be the tuple (count [int], {'section': [Section], 'xml': [etree xml]}) """ - LOG.debug('Entering ProcessingQueue.put with %s', item) self.not_full.acquire() try: if self.maxsize > 0: @@ -152,7 +151,6 @@ class ProcessingQueue(queue.Queue, object): raise queue.Full elif timeout is None: while self._total_qsize() == self.maxsize: - LOG.debug('ProcessingQueue._get - waiting in put') self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") @@ -162,17 +160,14 @@ class ProcessingQueue(queue.Queue, object): remaining = endtime - _time() if remaining <= 0.0: raise queue.Full - LOG.debug('ProcessingQueue._get - waiting 2 in put') self.not_full.wait(remaining) self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() finally: self.not_full.release() - LOG.debug('Exiting ProcessingQueue.put with %s', item) def _put(self, item): - LOG.debug('Entering ProcessingQueue._put with %s', item) for i, section in enumerate(self._sections): if item[1]['section'] == section: self._queues[i]._put(item) @@ -188,7 +183,6 @@ class ProcessingQueue(queue.Queue, object): Once the get()-method returns None, you've received the sentinel and you've thus exhausted the queue """ - LOG.debug('Entering ProcessingQueue.add_sentinel with %s', section) self.not_full.acquire() try: section.number_of_items = 1 @@ -199,7 +193,6 @@ class ProcessingQueue(queue.Queue, object): self.not_empty.notify() finally: self.not_full.release() - LOG.debug('Exiting ProcessingQueue.add_sentinel with %s', section) def add_section(self, section): """ @@ -209,16 +202,13 @@ class ProcessingQueue(queue.Queue, object): Be sure to set section.number_of_items correctly as it will signal when processing is completely done for a specific section! """ - LOG.debug('Entering ProcessingQueue.add_section with %s', section) self.mutex.acquire() try: self._add_section(section) finally: self.mutex.release() - LOG.debug('Exiting ProcessingQueue.add_section with %s', section) def _add_section(self, section): - LOG.debug('Entering ProcessingQueue._add_section with %s', section) self._sections.append(section) self._queues.append( OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM @@ -230,24 +220,18 @@ class ProcessingQueue(queue.Queue, object): """ Call only when a section has been completely exhausted """ - LOG.debug('Entering ProcessingQueue._init_next_section') self._sections.popleft() self._queues.popleft() self._activate_next_section() def _activate_next_section(self): - LOG.debug('Entering ProcessingQueue._activate_next_section') self._counter = 0 self._current_section = self._sections[0] if self._sections else None self._current_queue = self._queues[0] if self._queues else None def _get(self): - LOG.debug('Entering ProcessingQueue._get') item = self._current_queue._get() - LOG.debug('ProcessingQueue._get - got item %s', item) self._counter += 1 - LOG.debug('ProcessingQueue._get - counter %s of %s', - self._counter, self._current_section.number_of_items) if self._counter == self._current_section.number_of_items: self._init_next_section() return item[1] diff --git a/resources/lib/library_sync/process_metadata.py b/resources/lib/library_sync/process_metadata.py index aec11e02..0ba21f18 100644 --- a/resources/lib/library_sync/process_metadata.py +++ b/resources/lib/library_sync/process_metadata.py @@ -24,7 +24,6 @@ class ProcessMetadataThread(common.LibrarySyncMixin, super(ProcessMetadataThread, self).__init__() def start_section(self, section): - LOG.debug('Entering start_section') if section != self.last_section: if self.last_section: self.finish_last_section() @@ -34,10 +33,8 @@ class ProcessMetadataThread(common.LibrarySyncMixin, app.SYNC.path_verified = False else: LOG.debug('Resume processing section %s', section) - LOG.debug('Exiting start_section') def finish_last_section(self): - LOG.debug('Entering finish_last_section') if (not self.should_cancel() and self.last_section and self.last_section.sync_successful): # Check for should_cancel() because we cannot be sure that we @@ -51,17 +48,12 @@ class ProcessMetadataThread(common.LibrarySyncMixin, elif self.last_section and not self.last_section.sync_successful: LOG.warn('Sync not successful for section %s', self.last_section) self.successful = False - LOG.debug('Exiting finish_last_section') def _get(self): - LOG.debug('Entering _get') item = {'xml': None} while item and item['xml'] is None: - LOG.debug('_get: getting item') item = self.processing_queue.get() - LOG.debug('_get: gotten item') self.processing_queue.task_done() - LOG.debug('Exiting _get') return item def _run(self): @@ -73,7 +65,6 @@ class ProcessMetadataThread(common.LibrarySyncMixin, processed = 0 self.start_section(section) while not self.should_cancel(): - LOG.debug('In loop') if item is None: break elif item['section'] != section: @@ -81,14 +72,12 @@ class ProcessMetadataThread(common.LibrarySyncMixin, self.start_section(item['section']) section = item['section'] with section.context(self.current_time) as context: - LOG.debug('Processing %s', section) while not self.should_cancel(): if item is None or item['section'] != section: break self.update_progressbar(section, item['xml'][0].get('title'), section.count) - LOG.debug('Start add_update') context.add_update(item['xml'][0], section_name=section.name, section_id=section.section_id, @@ -97,9 +86,6 @@ class ProcessMetadataThread(common.LibrarySyncMixin, section.count += 1 if processed == COMMIT_TO_DB_EVERY_X_ITEMS: processed = 0 - LOG.debug('Commiting now') context.commit() item = self._get() - LOG.debug('Gotten a new item') - LOG.debug('start finish_last_section') self.finish_last_section() From e21f4c143d7ffc01ac67f3f85b19a4b5b50a4e81 Mon Sep 17 00:00:00 2001 From: croneter Date: Sun, 31 Jan 2021 17:39:44 +0100 Subject: [PATCH 3/3] Hopefully fix rare case when sync would get stuck indefinitely --- resources/lib/backgroundthread.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 7af48b7d..bb6454f6 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -136,6 +136,11 @@ class ProcessingQueue(queue.Queue, object): return self._current_queue._qsize() if self._current_queue else 0 def _total_qsize(self): + """ + This method is BROKEN as it can lead to a deadlock when a single item + from the current section takes longer to download then any new items + coming in + """ return sum(q._qsize() for q in self._queues) if self._queues else 0 def put(self, item, block=True, timeout=None): @@ -147,16 +152,16 @@ class ProcessingQueue(queue.Queue, object): try: if self.maxsize > 0: if not block: - if self._total_qsize() == self.maxsize: + if self._qsize() == self.maxsize: raise queue.Full elif timeout is None: - while self._total_qsize() == self.maxsize: + while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout - while self._total_qsize() == self.maxsize: + while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: raise queue.Full