Merge pull request #1090 from croneter/improve-threader

Improve thread pool management to render PKC snappier
This commit is contained in:
croneter 2019-12-20 14:18:23 +01:00 committed by GitHub
commit 4a95b1007b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 32 deletions

View file

@ -12,6 +12,7 @@ import xbmc
from . import utils, app, variables as v from . import utils, app, variables as v
WORKER_COUNT = 3
LOG = getLogger('PLEX.threads') LOG = getLogger('PLEX.threads')
@ -346,6 +347,11 @@ class Task(object):
return not self.finished and not self._canceled return not self.finished and not self._canceled
class ShutdownSentinel(Task):
def run(self):
pass
class FunctionAsTask(Task): class FunctionAsTask(Task):
def __init__(self, function, callback, *args, **kwargs): def __init__(self, function, callback, *args, **kwargs):
self._function = function self._function = function
@ -372,7 +378,7 @@ class MutablePriorityQueue(Queue.PriorityQueue):
lowest = self.queue and min(self.queue) or None lowest = self.queue and min(self.queue) or None
except Exception: except Exception:
lowest = None lowest = None
utils.ERROR() utils.ERROR(notify=True)
finally: finally:
self.mutex.release() self.mutex.release()
return lowest return lowest
@ -393,7 +399,7 @@ class BackgroundWorker(object):
try: try:
task._run() task._run()
except Exception: except Exception:
utils.ERROR() utils.ERROR(notify=True)
def abort(self): def abort(self):
self._abort = True self._abort = True
@ -423,13 +429,13 @@ class BackgroundWorker(object):
except Queue.Empty: except Queue.Empty:
LOG.debug('(%s): Idle', self.name) LOG.debug('(%s): Idle', self.name)
def shutdown(self): def shutdown(self, block=True):
self.abort() self.abort()
if self._task: if self._task:
self._task.cancel() self._task.cancel()
if self._thread and self._thread.isAlive(): if block and self._thread and self._thread.isAlive():
LOG.debug('thread (%s): Waiting...', self.name) LOG.debug('thread (%s): Waiting...', self.name)
self._thread.join() self._thread.join()
LOG.debug('thread (%s): Done', self.name) LOG.debug('thread (%s): Done', self.name)
@ -444,16 +450,17 @@ class NonstoppingBackgroundWorker(BackgroundWorker):
super(NonstoppingBackgroundWorker, self).__init__(queue, name) super(NonstoppingBackgroundWorker, self).__init__(queue, name)
def _queueLoop(self): def _queueLoop(self):
LOG.debug('Starting Worker %s', self.name)
while not self.aborted(): while not self.aborted():
try: self._task = self._queue.get()
self._task = self._queue.get_nowait() if self._task is ShutdownSentinel:
break
self._working = True self._working = True
self._runTask(self._task) self._runTask(self._task)
self._working = False self._working = False
self._queue.task_done() self._queue.task_done()
self._task = None self._task = None
except Queue.Empty: LOG.debug('Exiting Worker %s', self.name)
app.APP.monitor.waitForAbort(0.05)
def working(self): def working(self):
return self._working return self._working
@ -465,7 +472,10 @@ class BackgroundThreader:
self._queue = MutablePriorityQueue() self._queue = MutablePriorityQueue()
self._abort = False self._abort = False
self.priority = -1 self.priority = -1
self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)] self.workers = [
worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x))
for x in range(worker_count)
]
def _nextPriority(self): def _nextPriority(self):
self.priority += 1 self.priority += 1
@ -480,11 +490,11 @@ class BackgroundThreader:
def aborted(self): def aborted(self):
return self._abort or xbmc.abortRequested return self._abort or xbmc.abortRequested
def shutdown(self): def shutdown(self, block=True):
self.abort() self.abort()
self.addTasksToFront([ShutdownSentinel() for _ in self.workers])
for w in self.workers: for w in self.workers:
w.shutdown() w.shutdown(block)
def addTask(self, task): def addTask(self, task):
task.priority = self._nextPriority() task.priority = self._nextPriority()
@ -537,7 +547,9 @@ class BackgroundThreader:
class ThreaderManager: class ThreaderManager:
def __init__(self, worker=BackgroundWorker, worker_count=6): def __init__(self,
worker=NonstoppingBackgroundWorker,
worker_count=WORKER_COUNT):
self.index = 0 self.index = 0
self.abandoned = [] self.abandoned = []
self._workerhandler = worker self._workerhandler = worker
@ -557,10 +569,10 @@ class ThreaderManager:
self.threader = BackgroundThreader(name=str(self.index), self.threader = BackgroundThreader(name=str(self.index),
worker=self._workerhandler) worker=self._workerhandler)
def shutdown(self): def shutdown(self, block=True):
self.threader.shutdown() self.threader.shutdown(block)
for a in self.abandoned: for a in self.abandoned:
a.shutdown() a.shutdown(block)
BGThreader = ThreaderManager() BGThreader = ThreaderManager()

View file

@ -10,7 +10,7 @@ from .get_metadata import GetMetadataThread
from .fill_metadata_queue import FillMetadataQueue from .fill_metadata_queue import FillMetadataQueue
from .process_metadata import ProcessMetadataThread from .process_metadata import ProcessMetadataThread
from . import common, sections from . import common, sections
from .. import utils, timing, backgroundthread, variables as v, app from .. import utils, timing, backgroundthread as bg, variables as v, app
from .. import plex_functions as PF, itemtypes, path_ops from .. import plex_functions as PF, itemtypes, path_ops
if common.PLAYLIST_SYNC_ENABLED: if common.PLAYLIST_SYNC_ENABLED:
@ -30,7 +30,7 @@ UPDATED_AT_SAFETY = 60 * 5
LAST_VIEWED_AT_SAFETY = 60 * 5 LAST_VIEWED_AT_SAFETY = 60 * 5
class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): class FullSync(common.LibrarySyncMixin, bg.KillableThread):
def __init__(self, repair, callback, show_dialog): def __init__(self, repair, callback, show_dialog):
""" """
repair=True: force sync EVERY item repair=True: force sync EVERY item
@ -48,7 +48,7 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
self.section_queue = Queue.Queue() self.section_queue = Queue.Queue()
self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE) self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
self.processing_queue = backgroundthread.ProcessingQueue(maxsize=XML_QUEUE_SIZE) self.processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE)
self.current_time = timing.plex_now() self.current_time = timing.plex_now()
self.last_section = sections.Section() self.last_section = sections.Section()
@ -200,10 +200,9 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
]) ])
# ADD NEW ITEMS # ADD NEW ITEMS
# We need to enforce syncing e.g. show before season before episode # We need to enforce syncing e.g. show before season before episode
thread = backgroundthread.KillableThread( bg.FunctionAsTask(self.threaded_get_generators,
target=self.threaded_get_generators, None,
args=(kinds, self.section_queue, False)) kinds, self.section_queue, False).start()
thread.start()
# Do the heavy lifting # Do the heavy lifting
self.processing_loop_new_and_changed_items() self.processing_loop_new_and_changed_items()
common.update_kodi_library(video=True, music=True) common.update_kodi_library(video=True, music=True)
@ -235,10 +234,9 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
# Close the progress indicator dialog # Close the progress indicator dialog
self.dialog.close() self.dialog.close()
self.dialog = None self.dialog = None
thread = backgroundthread.KillableThread( bg.FunctionAsTask(self.threaded_get_generators,
target=self.threaded_get_generators, None,
args=(kinds, self.section_queue, True)) kinds, self.section_queue, True).start()
thread.start()
self.processing_loop_playstates() self.processing_loop_playstates()
if self.should_cancel() or not self.successful: if self.should_cancel() or not self.successful:
return return

View file

@ -544,6 +544,7 @@ class Service(object):
# Tell all threads to terminate (e.g. several lib sync threads) # Tell all threads to terminate (e.g. several lib sync threads)
LOG.debug('Aborting all threads') LOG.debug('Aborting all threads')
app.APP.stop_pkc = True app.APP.stop_pkc = True
backgroundthread.BGThreader.shutdown(block=False)
# Load/Reset PKC entirely - important for user/Kodi profile switch # Load/Reset PKC entirely - important for user/Kodi profile switch
# Clear video nodes properties # Clear video nodes properties
library_sync.clear_window_vars() library_sync.clear_window_vars()