Merge pull request #1114 from croneter/fix-queue

Fix rare but annoying bug where PKC becomes unresponsive during sync
This commit is contained in:
croneter 2020-02-15 18:42:58 +01:00 committed by GitHub
commit 9952a9b44a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 72 additions and 97 deletions

View file

@ -123,7 +123,7 @@ class ProcessingQueue(Queue.Queue, object):
Put tuples (count, item) into this queue, with count being the respective Put tuples (count, item) into this queue, with count being the respective
position of the item in the queue, starting with 0 (zero). position of the item in the queue, starting with 0 (zero).
(None, None) is the sentinel for a single queue being exhausted, added by (None, None) is the sentinel for a single queue being exhausted, added by
put_sentinel() add_sentinel()
""" """
def _init(self, maxsize): def _init(self, maxsize):
self.queue = deque() self.queue = deque()
@ -131,117 +131,78 @@ class ProcessingQueue(Queue.Queue, object):
self._queues = deque() self._queues = deque()
self._current_section = None self._current_section = None
self._current_queue = None self._current_queue = None
# Item-index for the currently active queue
self._counter = 0 self._counter = 0
def _qsize(self): def _qsize(self):
return self._current_queue._qsize() if self._current_queue else 0 return self._current_queue._qsize() if self._current_queue else 0
def total_size(self): def _total_qsize(self):
""" return sum(q._qsize() for q in self._queues) if self._queues else 0
Return the approximate total size of all queues (not reliable!)
"""
self.mutex.acquire()
n = sum(q._qsize() for q in self._queues) if self._queues else 0
self.mutex.release()
return n
def put(self, item, block=True, timeout=None): def put(self, item, block=True, timeout=None):
"""Put an item into the queue. """
PKC customization of Queue.put. item needs to be the tuple
If optional args 'block' is true and 'timeout' is None (the default), (count [int], {'section': [Section], 'xml': [etree xml]})
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
""" """
self.not_full.acquire() self.not_full.acquire()
try: try:
if self.maxsize > 0: if self.maxsize > 0:
if not block: if not block:
# Use >= instead of == due to OrderedQueue! if self._total_qsize() == self.maxsize:
if self._qsize() >= self.maxsize:
raise Queue.Full raise Queue.Full
elif timeout is None: elif timeout is None:
while self._qsize() >= self.maxsize: while self._total_qsize() == self.maxsize:
self.not_full.wait() self.not_full.wait()
elif timeout < 0: elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number") raise ValueError("'timeout' must be a non-negative number")
else: else:
endtime = _time() + timeout endtime = _time() + timeout
while self._qsize() >= self.maxsize: while self._total_qsize() == self.maxsize:
remaining = endtime - _time() remaining = endtime - _time()
if remaining <= 0.0: if remaining <= 0.0:
raise Queue.Full raise Queue.Full
self.not_full.wait(remaining) self.not_full.wait(remaining)
if self._put(item) == 0: self._put(item)
# Only notify one waiting thread if this item is put into the
# current queue
self.not_empty.notify()
else:
# Be sure to signal not_empty only once!
self._unlock_after_section_change()
self.unfinished_tasks += 1 self.unfinished_tasks += 1
self.not_empty.notify()
finally: finally:
self.not_full.release() self.not_full.release()
def _put(self, item): def _put(self, item):
"""
Returns the index of the section in whose subqueue we need to put the
item into
"""
for i, section in enumerate(self._sections): for i, section in enumerate(self._sections):
if item[1]['section'] == section: if item[1]['section'] == section:
self._queues[i]._put(item) self._queues[i]._put(item)
break break
else: else:
raise RuntimeError('Could not find section for item %s' % item[1]) raise RuntimeError('Could not find section for item %s' % item[1])
return i
def _unlock_after_section_change(self): def add_sentinel(self, section):
"""
Ugly work-around if we expected more items to be synced, but we had
to lower our section.number_of_items because PKC decided that nothing
changed and we don't need to sync the respective item(s).
get() thus might block indefinitely
"""
while (self._current_section and
self._counter == self._current_section.number_of_items):
LOG.debug('Signaling completion of current section')
self._init_next_section()
if self._current_queue and self._current_queue._qsize():
LOG.debug('Signaling not_empty')
self.not_empty.notify()
def put_sentinel(self, section):
""" """
Adds a new empty section as a sentinel. Call with an empty Section() Adds a new empty section as a sentinel. Call with an empty Section()
object. object. Call this method immediately after having added all sections
with add_section().
Once the get()-method returns None, you've received the sentinel and Once the get()-method returns None, you've received the sentinel and
you've thus exhausted the queue you've thus exhausted the queue
""" """
self.not_empty.acquire() self.not_full.acquire()
try: try:
section.number_of_items = 1 section.number_of_items = 1
self._add_section(section) self._add_section(section)
# Add the actual sentinel to the queue we just added # Add the actual sentinel to the queue we just added
self._queues[-1]._put((None, None)) self._queues[-1]._put((None, None))
self.unfinished_tasks += 1 self.unfinished_tasks += 1
if len(self._queues) == 1: self.not_empty.notify()
# queue was already exhausted!
self._switch_queues()
self._counter = 0
self.not_empty.notify()
else:
self._unlock_after_section_change()
finally: finally:
self.not_empty.release() self.not_full.release()
def add_section(self, section): def add_section(self, section):
""" """
Be sure to add all sections first before starting to pop items off this Add a new Section() to this Queue. Each section will be entirely
queue or adding them to the queue processed before moving on to the next section.
Be sure to set section.number_of_items correctly as it will signal
when processing is completely done for a specific section!
""" """
self.mutex.acquire() self.mutex.acquire()
try: try:
@ -255,15 +216,27 @@ class ProcessingQueue(Queue.Queue, object):
OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM
else Queue.Queue()) else Queue.Queue())
if self._current_section is None: if self._current_section is None:
self._switch_queues() self._activate_next_section()
def _init_next_section(self): def _init_next_section(self):
"""
Call only when a section has been completely exhausted
"""
# Might have some items left if we lowered section.number_of_items
leftover = self._current_queue._qsize()
if leftover:
LOG.warn('Still have %s items in the current queue', leftover)
self.unfinished_tasks -= leftover
if self.unfinished_tasks == 0:
self.all_tasks_done.notify_all()
elif self.unfinished_tasks < 0:
raise RuntimeError('Got negative number of unfinished_tasks')
self._sections.popleft() self._sections.popleft()
self._queues.popleft() self._queues.popleft()
self._counter = 0 self._activate_next_section()
self._switch_queues()
def _switch_queues(self): def _activate_next_section(self):
self._counter = 0
self._current_section = self._sections[0] if self._sections else None self._current_section = self._sections[0] if self._sections else None
self._current_queue = self._queues[0] if self._queues else None self._current_queue = self._queues[0] if self._queues else None

View file

@ -54,6 +54,8 @@ class FillMetadataQueue(common.LibrarySyncMixin,
count += 1 count += 1
# We might have received LESS items from the PMS than anticipated. # We might have received LESS items from the PMS than anticipated.
# Ensures that our queues finish # Ensures that our queues finish
LOG.debug('Expected to process %s items, actually processed %s for '
'section %s', section.number_of_items, count, section)
section.number_of_items = count section.number_of_items = count
def _run(self): def _run(self):

View file

@ -35,6 +35,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
""" """
repair=True: force sync EVERY item repair=True: force sync EVERY item
""" """
self.successful = True
self.repair = repair self.repair = repair
self.callback = callback self.callback = callback
# For progress dialog # For progress dialog
@ -45,21 +46,9 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
self.dialog.create(utils.lang(39714)) self.dialog.create(utils.lang(39714))
else: else:
self.dialog = None self.dialog = None
self.section_queue = Queue.Queue()
self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
self.processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE)
self.current_time = timing.plex_now() self.current_time = timing.plex_now()
self.last_section = sections.Section() self.last_section = sections.Section()
self.successful = True
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true' self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
self.threads = [
GetMetadataThread(self.get_metadata_queue, self.processing_queue)
for _ in range(int(utils.settings('syncThreadNumber')))
]
for t in self.threads:
t.start()
super(FullSync, self).__init__() super(FullSync, self).__init__()
def update_progressbar(self, section, title, current): def update_progressbar(self, section, title, current):
@ -89,33 +78,38 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH) path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH)
@utils.log_time @utils.log_time
def processing_loop_new_and_changed_items(self): def process_new_and_changed_items(self, section_queue, processing_queue):
LOG.debug('Start working') LOG.debug('Start working')
get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
scanner_thread = FillMetadataQueue(self.repair, scanner_thread = FillMetadataQueue(self.repair,
self.section_queue, section_queue,
self.get_metadata_queue) get_metadata_queue)
scanner_thread.start() scanner_thread.start()
metadata_threads = [
GetMetadataThread(get_metadata_queue, processing_queue)
for _ in range(int(utils.settings('syncThreadNumber')))
]
for t in metadata_threads:
t.start()
process_thread = ProcessMetadataThread(self.current_time, process_thread = ProcessMetadataThread(self.current_time,
self.processing_queue, processing_queue,
self.update_progressbar) self.update_progressbar)
process_thread.start() process_thread.start()
LOG.debug('Waiting for scanner thread to finish up') LOG.debug('Waiting for scanner thread to finish up')
scanner_thread.join() scanner_thread.join()
LOG.debug('Waiting for metadata download threads to finish up') LOG.debug('Waiting for metadata download threads to finish up')
for t in self.threads: for t in metadata_threads:
t.join() t.join()
LOG.debug('Download metadata threads finished') LOG.debug('Download metadata threads finished')
# Sentinel for the process_thread once we added everything else
self.processing_queue.put_sentinel(sections.Section())
process_thread.join() process_thread.join()
self.successful = process_thread.successful self.successful = process_thread.successful
LOG.debug('threads finished work. successful: %s', self.successful) LOG.debug('threads finished work. successful: %s', self.successful)
@utils.log_time @utils.log_time
def processing_loop_playstates(self): def processing_loop_playstates(self, section_queue):
while not self.should_cancel(): while not self.should_cancel():
section = self.section_queue.get() section = section_queue.get()
self.section_queue.task_done() section_queue.task_done()
if section is None: if section is None:
break break
self.playstate_per_section(section) self.playstate_per_section(section)
@ -142,7 +136,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
LOG.error('Could not entirely process section %s', section) LOG.error('Could not entirely process section %s', section)
self.successful = False self.successful = False
def threaded_get_generators(self, kinds, queue, all_items): def threaded_get_generators(self, kinds, section_queue, processing_queue,
all_items):
""" """
Getting iterators is costly, so let's do it in a dedicated thread Getting iterators is costly, so let's do it in a dedicated thread
""" """
@ -176,17 +171,22 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
else: else:
section.number_of_items = section.iterator.total section.number_of_items = section.iterator.total
if section.number_of_items > 0: if section.number_of_items > 0:
self.processing_queue.add_section(section) processing_queue.add_section(section)
queue.put(section) section_queue.put(section)
LOG.debug('Put section in queue with %s items: %s', LOG.debug('Put section in queue with %s items: %s',
section.number_of_items, section) section.number_of_items, section)
except Exception: except Exception:
utils.ERROR(notify=True) utils.ERROR(notify=True)
finally: finally:
queue.put(None) # Sentinel for the section queue
section_queue.put(None)
# Sentinel for the process_thread once we added everything else
processing_queue.add_sentinel(sections.Section())
LOG.debug('Exiting threaded_get_generators') LOG.debug('Exiting threaded_get_generators')
def full_library_sync(self): def full_library_sync(self):
section_queue = Queue.Queue()
processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE)
kinds = [ kinds = [
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE), (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE),
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW), (v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW),
@ -202,9 +202,9 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
# We need to enforce syncing e.g. show before season before episode # We need to enforce syncing e.g. show before season before episode
bg.FunctionAsTask(self.threaded_get_generators, bg.FunctionAsTask(self.threaded_get_generators,
None, None,
kinds, self.section_queue, False).start() kinds, section_queue, processing_queue, False).start()
# Do the heavy lifting # Do the heavy lifting
self.processing_loop_new_and_changed_items() self.process_new_and_changed_items(section_queue, processing_queue)
common.update_kodi_library(video=True, music=True) common.update_kodi_library(video=True, music=True)
if self.should_cancel() or not self.successful: if self.should_cancel() or not self.successful:
return return
@ -236,8 +236,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
self.dialog = None self.dialog = None
bg.FunctionAsTask(self.threaded_get_generators, bg.FunctionAsTask(self.threaded_get_generators,
None, None,
kinds, self.section_queue, True).start() kinds, section_queue, processing_queue, True).start()
self.processing_loop_playstates() self.processing_loop_playstates(section_queue)
if self.should_cancel() or not self.successful: if self.should_cancel() or not self.successful:
return return

View file

@ -52,7 +52,7 @@ class ProcessMetadataThread(common.LibrarySyncMixin,
def _get(self): def _get(self):
item = {'xml': None} item = {'xml': None}
while not self.should_cancel() and item and item['xml'] is None: while item and item['xml'] is None:
item = self.processing_queue.get() item = self.processing_queue.get()
self.processing_queue.task_done() self.processing_queue.task_done()
return item return item