Merge pull request #1117 from croneter/fix-queue

Fix yet another rare but annoying bug where PKC becomes unresponsive during sync
This commit is contained in:
croneter 2020-02-17 19:19:47 +01:00 committed by GitHub
commit 15241aab5d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 21 deletions

View file

@ -222,15 +222,6 @@ class ProcessingQueue(Queue.Queue, object):
""" """
Call only when a section has been completely exhausted Call only when a section has been completely exhausted
""" """
# Might have some items left if we lowered section.number_of_items
leftover = self._current_queue._qsize()
if leftover:
LOG.warn('Still have %s items in the current queue', leftover)
self.unfinished_tasks -= leftover
if self.unfinished_tasks == 0:
self.all_tasks_done.notify_all()
elif self.unfinished_tasks < 0:
raise RuntimeError('Got negative number of unfinished_tasks')
self._sections.popleft() self._sections.popleft()
self._queues.popleft() self._queues.popleft()
self._activate_next_section() self._activate_next_section()

View file

@ -3,7 +3,7 @@ from __future__ import absolute_import, division, unicode_literals
from logging import getLogger from logging import getLogger
from Queue import Empty from Queue import Empty
from . import common from . import common, sections
from ..plex_db import PlexDB from ..plex_db import PlexDB
from .. import backgroundthread from .. import backgroundthread
@ -19,10 +19,12 @@ class FillMetadataQueue(common.LibrarySyncMixin,
queue. Will use a COPIED plex.db file (plex-copy.db) in order to read much queue. Will use a COPIED plex.db file (plex-copy.db) in order to read much
faster without the writing thread stalling faster without the writing thread stalling
""" """
def __init__(self, repair, section_queue, get_metadata_queue): def __init__(self, repair, section_queue, get_metadata_queue,
processing_queue):
self.repair = repair self.repair = repair
self.section_queue = section_queue self.section_queue = section_queue
self.get_metadata_queue = get_metadata_queue self.get_metadata_queue = get_metadata_queue
self.processing_queue = processing_queue
super(FillMetadataQueue, self).__init__() super(FillMetadataQueue, self).__init__()
def _process_section(self, section): def _process_section(self, section):
@ -31,6 +33,7 @@ class FillMetadataQueue(common.LibrarySyncMixin,
LOG.debug('Process section %s with %s items', LOG.debug('Process section %s with %s items',
section, section.number_of_items) section, section.number_of_items)
count = 0 count = 0
do_process_section = False
with PlexDB(lock=False, copy=True) as plexdb: with PlexDB(lock=False, copy=True) as plexdb:
for xml in section.iterator: for xml in section.iterator:
if self.should_cancel(): if self.should_cancel():
@ -52,10 +55,14 @@ class FillMetadataQueue(common.LibrarySyncMixin,
section.sync_successful = False section.sync_successful = False
break break
count += 1 count += 1
if not do_process_section:
do_process_section = True
self.processing_queue.add_section(section)
LOG.debug('Put section in queue with %s items: %s',
section.number_of_items, section)
# We might have received LESS items from the PMS than anticipated. # We might have received LESS items from the PMS than anticipated.
# Ensures that our queues finish # Ensures that our queues finish
LOG.debug('Expected to process %s items, actually processed %s for ' LOG.debug('%s items to process for section %s', count, section)
'section %s', section.number_of_items, count, section)
section.number_of_items = count section.number_of_items = count
def _run(self): def _run(self):
@ -67,3 +74,5 @@ class FillMetadataQueue(common.LibrarySyncMixin,
self._process_section(section) self._process_section(section)
# Signal the download metadata threads to stop with a sentinel # Signal the download metadata threads to stop with a sentinel
self.get_metadata_queue.put(None) self.get_metadata_queue.put(None)
# Sentinel for the process_thread once we added everything else
self.processing_queue.add_sentinel(sections.Section())

View file

@ -83,7 +83,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
scanner_thread = FillMetadataQueue(self.repair, scanner_thread = FillMetadataQueue(self.repair,
section_queue, section_queue,
get_metadata_queue) get_metadata_queue,
processing_queue)
scanner_thread.start() scanner_thread.start()
metadata_threads = [ metadata_threads = [
GetMetadataThread(get_metadata_queue, processing_queue) GetMetadataThread(get_metadata_queue, processing_queue)
@ -136,8 +137,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
LOG.error('Could not entirely process section %s', section) LOG.error('Could not entirely process section %s', section)
self.successful = False self.successful = False
def threaded_get_generators(self, kinds, section_queue, processing_queue, def threaded_get_generators(self, kinds, section_queue, all_items):
all_items):
""" """
Getting iterators is costly, so let's do it in a dedicated thread Getting iterators is costly, so let's do it in a dedicated thread
""" """
@ -171,7 +171,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
else: else:
section.number_of_items = section.iterator.total section.number_of_items = section.iterator.total
if section.number_of_items > 0: if section.number_of_items > 0:
processing_queue.add_section(section)
section_queue.put(section) section_queue.put(section)
LOG.debug('Put section in queue with %s items: %s', LOG.debug('Put section in queue with %s items: %s',
section.number_of_items, section) section.number_of_items, section)
@ -180,8 +179,6 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
finally: finally:
# Sentinel for the section queue # Sentinel for the section queue
section_queue.put(None) section_queue.put(None)
# Sentinel for the process_thread once we added everything else
processing_queue.add_sentinel(sections.Section())
LOG.debug('Exiting threaded_get_generators') LOG.debug('Exiting threaded_get_generators')
def full_library_sync(self): def full_library_sync(self):
@ -202,7 +199,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
# We need to enforce syncing e.g. show before season before episode # We need to enforce syncing e.g. show before season before episode
bg.FunctionAsTask(self.threaded_get_generators, bg.FunctionAsTask(self.threaded_get_generators,
None, None,
kinds, section_queue, processing_queue, False).start() kinds, section_queue, False).start()
# Do the heavy lifting # Do the heavy lifting
self.process_new_and_changed_items(section_queue, processing_queue) self.process_new_and_changed_items(section_queue, processing_queue)
common.update_kodi_library(video=True, music=True) common.update_kodi_library(video=True, music=True)
@ -236,7 +233,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
self.dialog = None self.dialog = None
bg.FunctionAsTask(self.threaded_get_generators, bg.FunctionAsTask(self.threaded_get_generators,
None, None,
kinds, section_queue, processing_queue, True).start() kinds, section_queue, True).start()
self.processing_loop_playstates(section_queue) self.processing_loop_playstates(section_queue)
if self.should_cancel() or not self.successful: if self.should_cancel() or not self.successful:
return return