Merge pull request #1548 from croneter/fix-race

Fix a racing condition that could lead to the sync process getting stuck
This commit is contained in:
croneter 2021-07-23 14:47:52 +02:00 committed by GitHub
commit 2fd91ff9d6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 52 deletions

View File

@ -135,43 +135,6 @@ class ProcessingQueue(Queue.Queue, object):
def _qsize(self):
return self._current_queue._qsize() if self._current_queue else 0
def _total_qsize(self):
"""
This method is BROKEN as it can lead to a deadlock when a single item
from the current section takes longer to download then any new items
coming in
"""
return sum(q._qsize() for q in self._queues) if self._queues else 0
def put(self, item, block=True, timeout=None):
"""
PKC customization of Queue.put. item needs to be the tuple
(count [int], {'section': [Section], 'xml': [etree xml]})
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Queue.Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Queue.Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def _put(self, item):
for i, section in enumerate(self._sections):
if item[1]['section'] == section:
@ -188,16 +151,13 @@ class ProcessingQueue(Queue.Queue, object):
Once the get()-method returns None, you've received the sentinel and
you've thus exhausted the queue
"""
self.not_full.acquire()
try:
with self.not_full:
section.number_of_items = 1
self._add_section(section)
# Add the actual sentinel to the queue we just added
self._queues[-1]._put((None, None))
self.unfinished_tasks += 1
self.not_empty.notify()
finally:
self.not_full.release()
def add_section(self, section):
"""
@ -207,11 +167,8 @@ class ProcessingQueue(Queue.Queue, object):
Be sure to set section.number_of_items correctly as it will signal
when processing is completely done for a specific section!
"""
self.mutex.acquire()
try:
with self.mutex:
self._add_section(section)
finally:
self.mutex.release()
def _add_section(self, section):
self._sections.append(section)

View File

@ -46,6 +46,10 @@ class FillMetadataQueue(common.LibrarySyncMixin,
if (not self.repair and
plexdb.checksum(plex_id, section.plex_type) == checksum):
continue
if not do_process_section:
do_process_section = True
self.processing_queue.add_section(section)
LOG.debug('Put section in processing queue: %s', section)
try:
self.get_metadata_queue.put((count, plex_id, section),
timeout=QUEUE_TIMEOUT)
@ -54,16 +58,13 @@ class FillMetadataQueue(common.LibrarySyncMixin,
'aborting sync now', plex_id)
section.sync_successful = False
break
count += 1
if not do_process_section:
do_process_section = True
self.processing_queue.add_section(section)
LOG.debug('Put section in queue with %s items: %s',
section.number_of_items, section)
else:
count += 1
# We might have received LESS items from the PMS than anticipated.
# Ensures that our queues finish
LOG.debug('%s items to process for section %s', count, section)
section.number_of_items = count
LOG.debug('%s items to process for section %s',
section.number_of_items, section)
def _run(self):
while not self.should_cancel():

View File

@ -93,6 +93,7 @@ class Section(object):
"'name': '{self.name}', "
"'section_id': {self.section_id}, "
"'section_type': '{self.section_type}', "
"'plex_type': '{self.plex_type}', "
"'sync_to_kodi': {self.sync_to_kodi}, "
"'last_sync': {self.last_sync}"
"}}").format(self=self).encode('utf-8')