diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 1a5f5ca2..fa45883d 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -123,7 +123,7 @@ class ProcessingQueue(Queue.Queue, object): Put tuples (count, item) into this queue, with count being the respective position of the item in the queue, starting with 0 (zero). (None, None) is the sentinel for a single queue being exhausted, added by - put_sentinel() + add_sentinel() """ def _init(self, maxsize): self.queue = deque() @@ -131,6 +131,7 @@ class ProcessingQueue(Queue.Queue, object): self._queues = deque() self._current_section = None self._current_queue = None + # Item-index for the currently active queue self._counter = 0 def _qsize(self): @@ -140,15 +141,9 @@ class ProcessingQueue(Queue.Queue, object): return sum(q._qsize() for q in self._queues) if self._queues else 0 def put(self, item, block=True, timeout=None): - """Put an item into the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until a free slot is available. If 'timeout' is - a non-negative number, it blocks at most 'timeout' seconds and raises - the Full exception if no free slot was available within that time. - Otherwise ('block' is false), put an item on the queue if a free slot - is immediately available, else raise the Full exception ('timeout' - is ignored in that case). + """ + PKC customization of Queue.put. item needs to be the tuple + (count [int], {'section': [Section], 'xml': [etree xml]}) """ self.not_full.acquire() try: @@ -168,73 +163,46 @@ class ProcessingQueue(Queue.Queue, object): if remaining <= 0.0: raise Queue.Full self.not_full.wait(remaining) - if self._put(item) == 0: - # Only notify one waiting thread if this item is put into the - # current queue - self.not_empty.notify() - else: - # Be sure to signal not_empty only once! - self._unlock_after_section_change() + self._put(item) self.unfinished_tasks += 1 + self.not_empty.notify() finally: self.not_full.release() def _put(self, item): - """ - Returns the index of the section in whose subqueue we need to put the - item into - """ for i, section in enumerate(self._sections): if item[1]['section'] == section: self._queues[i]._put(item) break else: raise RuntimeError('Could not find section for item %s' % item[1]) - return i - def _unlock_after_section_change(self): - """ - Ugly work-around if we expected more items to be synced, but we had - to lower our section.number_of_items because PKC decided that nothing - changed and we don't need to sync the respective item(s). - get() thus might block indefinitely - """ - while (self._current_section and - self._counter == self._current_section.number_of_items): - LOG.debug('Signaling completion of current section') - self._init_next_section() - if self._current_queue and self._current_queue._qsize(): - LOG.debug('Signaling not_empty') - self.not_empty.notify() - - def put_sentinel(self, section): + def add_sentinel(self, section): """ Adds a new empty section as a sentinel. Call with an empty Section() - object. + object. Call this method immediately after having added all sections + with add_section(). Once the get()-method returns None, you've received the sentinel and you've thus exhausted the queue """ - self.not_empty.acquire() + self.not_full.acquire() try: section.number_of_items = 1 self._add_section(section) # Add the actual sentinel to the queue we just added self._queues[-1]._put((None, None)) self.unfinished_tasks += 1 - if len(self._queues) == 1: - # queue was already exhausted! - self._activate_next_section() - self._counter = 0 - self.not_empty.notify() - else: - self._unlock_after_section_change() + self.not_empty.notify() finally: - self.not_empty.release() + self.not_full.release() def add_section(self, section): """ - Be sure to add all sections first before starting to pop items off this - queue or adding them to the queue + Add a new Section() to this Queue. Each section will be entirely + processed before moving on to the next section. + + Be sure to set section.number_of_items correctly as it will signal + when processing is completely done for a specific section! """ self.mutex.acquire() try: @@ -251,12 +219,24 @@ class ProcessingQueue(Queue.Queue, object): self._activate_next_section() def _init_next_section(self): + """ + Call only when a section has been completely exhausted + """ + # Might have some items left if we lowered section.number_of_items + leftover = self._current_queue._qsize() + if leftover: + LOG.warn('Still have %s items in the current queue', leftover) + self.unfinished_tasks -= leftover + if self.unfinished_tasks == 0: + self.all_tasks_done.notify_all() + elif self.unfinished_tasks < 0: + raise RuntimeError('Got negative number of unfinished_tasks') self._sections.popleft() self._queues.popleft() - self._counter = 0 self._activate_next_section() def _activate_next_section(self): + self._counter = 0 self._current_section = self._sections[0] if self._sections else None self._current_queue = self._queues[0] if self._queues else None diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index b2e7e39c..5a9ac400 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -101,9 +101,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): for t in metadata_threads: t.join() LOG.debug('Download metadata threads finished') - # Sentinel for the process_thread once we added everything else - processing_queue.put_sentinel(sections.Section()) - LOG.debug('Put sentinel into queue, waiting for processing thread') process_thread.join() self.successful = process_thread.successful LOG.debug('threads finished work. successful: %s', self.successful) @@ -181,7 +178,10 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread): except Exception: utils.ERROR(notify=True) finally: + # Sentinel for the section queue section_queue.put(None) + # Sentinel for the process_thread once we added everything else + processing_queue.add_sentinel(sections.Section()) LOG.debug('Exiting threaded_get_generators') def full_library_sync(self):