diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index e4eb44d7..835ea16f 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -12,6 +12,7 @@ import xbmc from . import utils, app, variables as v +WORKER_COUNT = 3 LOG = getLogger('PLEX.threads') @@ -346,6 +347,11 @@ class Task(object): return not self.finished and not self._canceled +class ShutdownSentinel(Task): + def run(self): + pass + + class FunctionAsTask(Task): def __init__(self, function, callback, *args, **kwargs): self._function = function @@ -372,7 +378,7 @@ class MutablePriorityQueue(Queue.PriorityQueue): lowest = self.queue and min(self.queue) or None except Exception: lowest = None - utils.ERROR() + utils.ERROR(notify=True) finally: self.mutex.release() return lowest @@ -393,7 +399,7 @@ class BackgroundWorker(object): try: task._run() except Exception: - utils.ERROR() + utils.ERROR(notify=True) def abort(self): self._abort = True @@ -423,13 +429,13 @@ class BackgroundWorker(object): except Queue.Empty: LOG.debug('(%s): Idle', self.name) - def shutdown(self): + def shutdown(self, block=True): self.abort() if self._task: self._task.cancel() - if self._thread and self._thread.isAlive(): + if block and self._thread and self._thread.isAlive(): LOG.debug('thread (%s): Waiting...', self.name) self._thread.join() LOG.debug('thread (%s): Done', self.name) @@ -444,16 +450,17 @@ class NonstoppingBackgroundWorker(BackgroundWorker): super(NonstoppingBackgroundWorker, self).__init__(queue, name) def _queueLoop(self): + LOG.debug('Starting Worker %s', self.name) while not self.aborted(): - try: - self._task = self._queue.get_nowait() - self._working = True - self._runTask(self._task) - self._working = False - self._queue.task_done() - self._task = None - except Queue.Empty: - app.APP.monitor.waitForAbort(0.05) + self._task = self._queue.get() + if self._task is ShutdownSentinel: + break + self._working = True + self._runTask(self._task) + self._working = False + self._queue.task_done() + self._task = None + LOG.debug('Exiting Worker %s', self.name) def working(self): return self._working @@ -465,7 +472,10 @@ class BackgroundThreader: self._queue = MutablePriorityQueue() self._abort = False self.priority = -1 - self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)] + self.workers = [ + worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) + for x in range(worker_count) + ] def _nextPriority(self): self.priority += 1 @@ -480,11 +490,11 @@ class BackgroundThreader: def aborted(self): return self._abort or xbmc.abortRequested - def shutdown(self): + def shutdown(self, block=True): self.abort() - + self.addTasksToFront([ShutdownSentinel() for _ in self.workers]) for w in self.workers: - w.shutdown() + w.shutdown(block) def addTask(self, task): task.priority = self._nextPriority() @@ -537,7 +547,9 @@ class BackgroundThreader: class ThreaderManager: - def __init__(self, worker=BackgroundWorker, worker_count=6): + def __init__(self, + worker=NonstoppingBackgroundWorker, + worker_count=WORKER_COUNT): self.index = 0 self.abandoned = [] self._workerhandler = worker @@ -557,10 +569,10 @@ class ThreaderManager: self.threader = BackgroundThreader(name=str(self.index), worker=self._workerhandler) - def shutdown(self): - self.threader.shutdown() + def shutdown(self, block=True): + self.threader.shutdown(block) for a in self.abandoned: - a.shutdown() + a.shutdown(block) BGThreader = ThreaderManager() diff --git a/resources/lib/service_entry.py b/resources/lib/service_entry.py index 6d0ac9df..ac65266e 100644 --- a/resources/lib/service_entry.py +++ b/resources/lib/service_entry.py @@ -544,6 +544,7 @@ class Service(object): # Tell all threads to terminate (e.g. several lib sync threads) LOG.debug('Aborting all threads') app.APP.stop_pkc = True + backgroundthread.BGThreader.shutdown(block=False) # Load/Reset PKC entirely - important for user/Kodi profile switch # Clear video nodes properties library_sync.clear_window_vars()