Rewire the ProcessingQueue to ensure that we can exhaust it completely and don't get stuck

This commit is contained in:
croneter 2020-02-15 17:46:48 +01:00
parent 73ffb706f8
commit 51d1538f95
2 changed files with 34 additions and 54 deletions

View file

@ -123,7 +123,7 @@ class ProcessingQueue(Queue.Queue, object):
Put tuples (count, item) into this queue, with count being the respective Put tuples (count, item) into this queue, with count being the respective
position of the item in the queue, starting with 0 (zero). position of the item in the queue, starting with 0 (zero).
(None, None) is the sentinel for a single queue being exhausted, added by (None, None) is the sentinel for a single queue being exhausted, added by
put_sentinel() add_sentinel()
""" """
def _init(self, maxsize): def _init(self, maxsize):
self.queue = deque() self.queue = deque()
@ -131,6 +131,7 @@ class ProcessingQueue(Queue.Queue, object):
self._queues = deque() self._queues = deque()
self._current_section = None self._current_section = None
self._current_queue = None self._current_queue = None
# Item-index for the currently active queue
self._counter = 0 self._counter = 0
def _qsize(self): def _qsize(self):
@ -140,15 +141,9 @@ class ProcessingQueue(Queue.Queue, object):
return sum(q._qsize() for q in self._queues) if self._queues else 0 return sum(q._qsize() for q in self._queues) if self._queues else 0
def put(self, item, block=True, timeout=None): def put(self, item, block=True, timeout=None):
"""Put an item into the queue. """
PKC customization of Queue.put. item needs to be the tuple
If optional args 'block' is true and 'timeout' is None (the default), (count [int], {'section': [Section], 'xml': [etree xml]})
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
""" """
self.not_full.acquire() self.not_full.acquire()
try: try:
@ -168,73 +163,46 @@ class ProcessingQueue(Queue.Queue, object):
if remaining <= 0.0: if remaining <= 0.0:
raise Queue.Full raise Queue.Full
self.not_full.wait(remaining) self.not_full.wait(remaining)
if self._put(item) == 0: self._put(item)
# Only notify one waiting thread if this item is put into the
# current queue
self.not_empty.notify()
else:
# Be sure to signal not_empty only once!
self._unlock_after_section_change()
self.unfinished_tasks += 1 self.unfinished_tasks += 1
self.not_empty.notify()
finally: finally:
self.not_full.release() self.not_full.release()
def _put(self, item): def _put(self, item):
"""
Returns the index of the section in whose subqueue we need to put the
item into
"""
for i, section in enumerate(self._sections): for i, section in enumerate(self._sections):
if item[1]['section'] == section: if item[1]['section'] == section:
self._queues[i]._put(item) self._queues[i]._put(item)
break break
else: else:
raise RuntimeError('Could not find section for item %s' % item[1]) raise RuntimeError('Could not find section for item %s' % item[1])
return i
def _unlock_after_section_change(self): def add_sentinel(self, section):
"""
Ugly work-around if we expected more items to be synced, but we had
to lower our section.number_of_items because PKC decided that nothing
changed and we don't need to sync the respective item(s).
get() thus might block indefinitely
"""
while (self._current_section and
self._counter == self._current_section.number_of_items):
LOG.debug('Signaling completion of current section')
self._init_next_section()
if self._current_queue and self._current_queue._qsize():
LOG.debug('Signaling not_empty')
self.not_empty.notify()
def put_sentinel(self, section):
""" """
Adds a new empty section as a sentinel. Call with an empty Section() Adds a new empty section as a sentinel. Call with an empty Section()
object. object. Call this method immediately after having added all sections
with add_section().
Once the get()-method returns None, you've received the sentinel and Once the get()-method returns None, you've received the sentinel and
you've thus exhausted the queue you've thus exhausted the queue
""" """
self.not_empty.acquire() self.not_full.acquire()
try: try:
section.number_of_items = 1 section.number_of_items = 1
self._add_section(section) self._add_section(section)
# Add the actual sentinel to the queue we just added # Add the actual sentinel to the queue we just added
self._queues[-1]._put((None, None)) self._queues[-1]._put((None, None))
self.unfinished_tasks += 1 self.unfinished_tasks += 1
if len(self._queues) == 1:
# queue was already exhausted!
self._activate_next_section()
self._counter = 0
self.not_empty.notify() self.not_empty.notify()
else:
self._unlock_after_section_change()
finally: finally:
self.not_empty.release() self.not_full.release()
def add_section(self, section): def add_section(self, section):
""" """
Be sure to add all sections first before starting to pop items off this Add a new Section() to this Queue. Each section will be entirely
queue or adding them to the queue processed before moving on to the next section.
Be sure to set section.number_of_items correctly as it will signal
when processing is completely done for a specific section!
""" """
self.mutex.acquire() self.mutex.acquire()
try: try:
@ -251,12 +219,24 @@ class ProcessingQueue(Queue.Queue, object):
self._activate_next_section() self._activate_next_section()
def _init_next_section(self): def _init_next_section(self):
"""
Call only when a section has been completely exhausted
"""
# Might have some items left if we lowered section.number_of_items
leftover = self._current_queue._qsize()
if leftover:
LOG.warn('Still have %s items in the current queue', leftover)
self.unfinished_tasks -= leftover
if self.unfinished_tasks == 0:
self.all_tasks_done.notify_all()
elif self.unfinished_tasks < 0:
raise RuntimeError('Got negative number of unfinished_tasks')
self._sections.popleft() self._sections.popleft()
self._queues.popleft() self._queues.popleft()
self._counter = 0
self._activate_next_section() self._activate_next_section()
def _activate_next_section(self): def _activate_next_section(self):
self._counter = 0
self._current_section = self._sections[0] if self._sections else None self._current_section = self._sections[0] if self._sections else None
self._current_queue = self._queues[0] if self._queues else None self._current_queue = self._queues[0] if self._queues else None

View file

@ -101,9 +101,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
for t in metadata_threads: for t in metadata_threads:
t.join() t.join()
LOG.debug('Download metadata threads finished') LOG.debug('Download metadata threads finished')
# Sentinel for the process_thread once we added everything else
processing_queue.put_sentinel(sections.Section())
LOG.debug('Put sentinel into queue, waiting for processing thread')
process_thread.join() process_thread.join()
self.successful = process_thread.successful self.successful = process_thread.successful
LOG.debug('threads finished work. successful: %s', self.successful) LOG.debug('threads finished work. successful: %s', self.successful)
@ -181,7 +178,10 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
except Exception: except Exception:
utils.ERROR(notify=True) utils.ERROR(notify=True)
finally: finally:
# Sentinel for the section queue
section_queue.put(None) section_queue.put(None)
# Sentinel for the process_thread once we added everything else
processing_queue.add_sentinel(sections.Section())
LOG.debug('Exiting threaded_get_generators') LOG.debug('Exiting threaded_get_generators')
def full_library_sync(self): def full_library_sync(self):