diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 5c095720..43aee604 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -135,43 +135,6 @@ class ProcessingQueue(Queue.Queue, object): def _qsize(self): return self._current_queue._qsize() if self._current_queue else 0 - def _total_qsize(self): - """ - This method is BROKEN as it can lead to a deadlock when a single item - from the current section takes longer to download then any new items - coming in - """ - return sum(q._qsize() for q in self._queues) if self._queues else 0 - - def put(self, item, block=True, timeout=None): - """ - PKC customization of Queue.put. item needs to be the tuple - (count [int], {'section': [Section], 'xml': [etree xml]}) - """ - self.not_full.acquire() - try: - if self.maxsize > 0: - if not block: - if self._qsize() == self.maxsize: - raise Queue.Full - elif timeout is None: - while self._qsize() == self.maxsize: - self.not_full.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - endtime = _time() + timeout - while self._qsize() == self.maxsize: - remaining = endtime - _time() - if remaining <= 0.0: - raise Queue.Full - self.not_full.wait(remaining) - self._put(item) - self.unfinished_tasks += 1 - self.not_empty.notify() - finally: - self.not_full.release() - def _put(self, item): for i, section in enumerate(self._sections): if item[1]['section'] == section: @@ -188,16 +151,13 @@ class ProcessingQueue(Queue.Queue, object): Once the get()-method returns None, you've received the sentinel and you've thus exhausted the queue """ - self.not_full.acquire() - try: + with self.not_full: section.number_of_items = 1 self._add_section(section) # Add the actual sentinel to the queue we just added self._queues[-1]._put((None, None)) self.unfinished_tasks += 1 self.not_empty.notify() - finally: - self.not_full.release() def add_section(self, section): """ @@ -207,11 +167,8 @@ class ProcessingQueue(Queue.Queue, object): Be sure to set section.number_of_items correctly as it will signal when processing is completely done for a specific section! """ - self.mutex.acquire() - try: + with self.mutex: self._add_section(section) - finally: - self.mutex.release() def _add_section(self, section): self._sections.append(section) diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py index 6e7f717d..a459b7af 100644 --- a/resources/lib/library_sync/fill_metadata_queue.py +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -46,6 +46,10 @@ class FillMetadataQueue(common.LibrarySyncMixin, if (not self.repair and plexdb.checksum(plex_id, section.plex_type) == checksum): continue + if not do_process_section: + do_process_section = True + self.processing_queue.add_section(section) + LOG.debug('Put section in processing queue: %s', section) try: self.get_metadata_queue.put((count, plex_id, section), timeout=QUEUE_TIMEOUT) @@ -54,16 +58,13 @@ class FillMetadataQueue(common.LibrarySyncMixin, 'aborting sync now', plex_id) section.sync_successful = False break - count += 1 - if not do_process_section: - do_process_section = True - self.processing_queue.add_section(section) - LOG.debug('Put section in queue with %s items: %s', - section.number_of_items, section) + else: + count += 1 # We might have received LESS items from the PMS than anticipated. # Ensures that our queues finish - LOG.debug('%s items to process for section %s', count, section) section.number_of_items = count + LOG.debug('%s items to process for section %s', + section.number_of_items, section) def _run(self): while not self.should_cancel(): diff --git a/resources/lib/library_sync/sections.py b/resources/lib/library_sync/sections.py index 481beae5..9fa67d5e 100644 --- a/resources/lib/library_sync/sections.py +++ b/resources/lib/library_sync/sections.py @@ -93,6 +93,7 @@ class Section(object): "'name': '{self.name}', " "'section_id': {self.section_id}, " "'section_type': '{self.section_type}', " + "'plex_type': '{self.plex_type}', " "'sync_to_kodi': {self.sync_to_kodi}, " "'last_sync': {self.last_sync}" "}}").format(self=self).encode('utf-8')