From fe857cb6091cd87d7ff4d751b53278a8b9ceea84 Mon Sep 17 00:00:00 2001 From: croneter Date: Tue, 17 Dec 2019 16:01:35 +0100 Subject: [PATCH 1/2] Improve thread pool management to render PKC snappier --- resources/lib/backgroundthread.py | 54 +++++++++++++++++++------------ resources/lib/service_entry.py | 1 + 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index e4eb44d7..835ea16f 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -12,6 +12,7 @@ import xbmc from . import utils, app, variables as v +WORKER_COUNT = 3 LOG = getLogger('PLEX.threads') @@ -346,6 +347,11 @@ class Task(object): return not self.finished and not self._canceled +class ShutdownSentinel(Task): + def run(self): + pass + + class FunctionAsTask(Task): def __init__(self, function, callback, *args, **kwargs): self._function = function @@ -372,7 +378,7 @@ class MutablePriorityQueue(Queue.PriorityQueue): lowest = self.queue and min(self.queue) or None except Exception: lowest = None - utils.ERROR() + utils.ERROR(notify=True) finally: self.mutex.release() return lowest @@ -393,7 +399,7 @@ class BackgroundWorker(object): try: task._run() except Exception: - utils.ERROR() + utils.ERROR(notify=True) def abort(self): self._abort = True @@ -423,13 +429,13 @@ class BackgroundWorker(object): except Queue.Empty: LOG.debug('(%s): Idle', self.name) - def shutdown(self): + def shutdown(self, block=True): self.abort() if self._task: self._task.cancel() - if self._thread and self._thread.isAlive(): + if block and self._thread and self._thread.isAlive(): LOG.debug('thread (%s): Waiting...', self.name) self._thread.join() LOG.debug('thread (%s): Done', self.name) @@ -444,16 +450,17 @@ class NonstoppingBackgroundWorker(BackgroundWorker): super(NonstoppingBackgroundWorker, self).__init__(queue, name) def _queueLoop(self): + LOG.debug('Starting Worker %s', self.name) while not self.aborted(): - try: - self._task = self._queue.get_nowait() - self._working = True - self._runTask(self._task) - self._working = False - self._queue.task_done() - self._task = None - except Queue.Empty: - app.APP.monitor.waitForAbort(0.05) + self._task = self._queue.get() + if self._task is ShutdownSentinel: + break + self._working = True + self._runTask(self._task) + self._working = False + self._queue.task_done() + self._task = None + LOG.debug('Exiting Worker %s', self.name) def working(self): return self._working @@ -465,7 +472,10 @@ class BackgroundThreader: self._queue = MutablePriorityQueue() self._abort = False self.priority = -1 - self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)] + self.workers = [ + worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) + for x in range(worker_count) + ] def _nextPriority(self): self.priority += 1 @@ -480,11 +490,11 @@ class BackgroundThreader: def aborted(self): return self._abort or xbmc.abortRequested - def shutdown(self): + def shutdown(self, block=True): self.abort() - + self.addTasksToFront([ShutdownSentinel() for _ in self.workers]) for w in self.workers: - w.shutdown() + w.shutdown(block) def addTask(self, task): task.priority = self._nextPriority() @@ -537,7 +547,9 @@ class BackgroundThreader: class ThreaderManager: - def __init__(self, worker=BackgroundWorker, worker_count=6): + def __init__(self, + worker=NonstoppingBackgroundWorker, + worker_count=WORKER_COUNT): self.index = 0 self.abandoned = [] self._workerhandler = worker @@ -557,10 +569,10 @@ class ThreaderManager: self.threader = BackgroundThreader(name=str(self.index), worker=self._workerhandler) - def shutdown(self): - self.threader.shutdown() + def shutdown(self, block=True): + self.threader.shutdown(block) for a in self.abandoned: - a.shutdown() + a.shutdown(block) BGThreader = ThreaderManager() diff --git a/resources/lib/service_entry.py b/resources/lib/service_entry.py index 6d0ac9df..ac65266e 100644 --- a/resources/lib/service_entry.py +++ b/resources/lib/service_entry.py @@ -544,6 +544,7 @@ class Service(object): # Tell all threads to terminate (e.g. several lib sync threads) LOG.debug('Aborting all threads') app.APP.stop_pkc = True + backgroundthread.BGThreader.shutdown(block=False) # Load/Reset PKC entirely - important for user/Kodi profile switch # Clear video nodes properties library_sync.clear_window_vars() From 0255551ea983161ea5e666afd15581891459299c Mon Sep 17 00:00:00 2001 From: croneter Date: Tue, 17 Dec 2019 16:10:24 +0100 Subject: [PATCH 2/2] Don't spin up 2 separate threads but use the thread pool --- resources/lib/library_sync/full_sync.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index d885d8d2..82039dbd 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -10,7 +10,7 @@ from .get_metadata import GetMetadataThread from .fill_metadata_queue import FillMetadataQueue from .process_metadata import ProcessMetadataThread from . import common, sections -from .. import utils, timing, backgroundthread, variables as v, app +from .. import utils, timing, backgroundthread as bg, variables as v, app from .. import plex_functions as PF, itemtypes, path_ops if common.PLAYLIST_SYNC_ENABLED: @@ -30,7 +30,7 @@ UPDATED_AT_SAFETY = 60 * 5 LAST_VIEWED_AT_SAFETY = 60 * 5 -class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): +class FullSync(common.LibrarySyncMixin, bg.KillableThread): def __init__(self, repair, callback, show_dialog): """ repair=True: force sync EVERY item @@ -48,7 +48,7 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): self.section_queue = Queue.Queue() self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) - self.processing_queue = backgroundthread.ProcessingQueue(maxsize=XML_QUEUE_SIZE) + self.processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE) self.current_time = timing.plex_now() self.last_section = sections.Section() @@ -200,10 +200,9 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): ]) # ADD NEW ITEMS # We need to enforce syncing e.g. show before season before episode - thread = backgroundthread.KillableThread( - target=self.threaded_get_generators, - args=(kinds, self.section_queue, False)) - thread.start() + bg.FunctionAsTask(self.threaded_get_generators, + None, + kinds, self.section_queue, False).start() # Do the heavy lifting self.processing_loop_new_and_changed_items() common.update_kodi_library(video=True, music=True) @@ -235,10 +234,9 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): # Close the progress indicator dialog self.dialog.close() self.dialog = None - thread = backgroundthread.KillableThread( - target=self.threaded_get_generators, - args=(kinds, self.section_queue, True)) - thread.start() + bg.FunctionAsTask(self.threaded_get_generators, + None, + kinds, self.section_queue, True).start() self.processing_loop_playstates() if self.should_cancel() or not self.successful: return