Merge pull request #1552 from croneter/py3-fix-racing
Fix a racing condition that could lead to the sync getting stuck
This commit is contained in:
commit
5bde2c6f98
2 changed files with 20 additions and 1 deletions
|
@ -170,6 +170,24 @@ class ProcessingQueue(queue.Queue, object):
|
||||||
with self.mutex:
|
with self.mutex:
|
||||||
self._add_section(section)
|
self._add_section(section)
|
||||||
|
|
||||||
|
def change_section_number_of_items(self, section, number_of_items):
|
||||||
|
"""
|
||||||
|
Hit this method if you've reset section.number_of_items to make
|
||||||
|
sure we're not blocking
|
||||||
|
"""
|
||||||
|
with self.mutex:
|
||||||
|
self._change_section_number_of_items(section, number_of_items)
|
||||||
|
|
||||||
|
def _change_section_number_of_items(self, section, number_of_items):
|
||||||
|
section.number_of_items = number_of_items
|
||||||
|
if (self._current_section == section
|
||||||
|
and self._counter == number_of_items):
|
||||||
|
# We were actually waiting for more items to come in - but there
|
||||||
|
# aren't any!
|
||||||
|
self._init_next_section()
|
||||||
|
if self._qsize() > 0:
|
||||||
|
self.not_empty.notify()
|
||||||
|
|
||||||
def _add_section(self, section):
|
def _add_section(self, section):
|
||||||
self._sections.append(section)
|
self._sections.append(section)
|
||||||
self._queues.append(
|
self._queues.append(
|
||||||
|
|
|
@ -61,7 +61,8 @@ class FillMetadataQueue(common.LibrarySyncMixin,
|
||||||
count += 1
|
count += 1
|
||||||
# We might have received LESS items from the PMS than anticipated.
|
# We might have received LESS items from the PMS than anticipated.
|
||||||
# Ensures that our queues finish
|
# Ensures that our queues finish
|
||||||
section.number_of_items = count
|
self.processing_queue.change_section_number_of_items(section,
|
||||||
|
count)
|
||||||
LOG.debug('%s items to process for section %s',
|
LOG.debug('%s items to process for section %s',
|
||||||
section.number_of_items, section)
|
section.number_of_items, section)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue