PlexKodiConnect/resources/lib/backgroundthread.py

430 lines
12 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
import threading
import Queue
import heapq
import xbmc
from . import utils, app
LOG = getLogger('PLEX.threads')
class KillableThread(threading.Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self._canceled = False
self._suspended = False
self._is_not_suspended = threading.Event()
self._is_not_suspended.set()
self._suspension_reached = threading.Event()
self._is_not_asleep = threading.Event()
self._is_not_asleep.set()
self.suspension_timeout = None
super(KillableThread, self).__init__(group, target, name, args, kwargs)
def should_cancel(self):
"""
Returns True if the thread should be stopped immediately
"""
return self._canceled or app.APP.stop_pkc
def cancel(self):
"""
Call from another thread to stop this current thread
"""
self._canceled = True
# Make sure thread is running in order to exit quickly
self._is_not_suspended.set()
self._is_not_asleep.set()
def should_suspend(self):
"""
Returns True if the current thread should be suspended immediately
"""
return self._suspended
def suspend(self, block=False, timeout=None):
"""
Call from another thread to suspend the current thread. Provide a
timeout [float] in seconds optionally. block=True will block the caller
until the thread-to-be-suspended is indeed suspended
Will wake a thread that is asleep!
"""
self.suspension_timeout = timeout
self._suspended = True
self._is_not_suspended.clear()
# Make sure thread wakes up in order to suspend
self._is_not_asleep.set()
if block:
self._suspension_reached.wait()
def resume(self):
"""
Call from another thread to revive a suspended or asleep current thread
back to life
"""
self._suspended = False
self._is_not_suspended.set()
self._is_not_asleep.set()
def wait_while_suspended(self):
"""
Blocks until thread is not suspended anymore or the thread should
exit or for a period of self.suspension_timeout (set by the caller of
suspend())
Returns the value of should_cancel()
"""
self._suspension_reached.set()
self._is_not_suspended.wait(self.suspension_timeout)
self._suspension_reached.clear()
return self.should_cancel()
def is_suspended(self):
"""
Check from another thread whether the current thread is suspended
"""
return self._suspension_reached.is_set()
def sleep(self, timeout):
"""
Only call from the current thread in order to sleep for a period of
timeout [float, seconds]. Will unblock immediately if thread should
cancel (should_cancel()) or the thread should_suspend
"""
self._is_not_asleep.clear()
self._is_not_asleep.wait(timeout)
self._is_not_asleep.set()
def is_asleep(self):
"""
Check from another thread whether the current thread is asleep
"""
return not self._is_not_asleep.is_set()
class OrderedQueue(Queue.PriorityQueue, object):
"""
Queue that enforces an order on the items it returns. An item you push
onto the queue must be a tuple
(index, item)
where index=-1 is the item that will be returned first. The Queue will block
until index=-1, 0, 1, 2, 3, ... is then made available
"""
def __init__(self, maxsize=0):
super(OrderedQueue, self).__init__(maxsize)
self.smallest = -1
self.not_next_item = threading.Condition(self.mutex)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
if item[0] == self.smallest:
self.not_next_item.notify()
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize() or self.queue[0][0] != self.smallest:
raise Queue.Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
while self.queue[0][0] != self.smallest:
self.not_next_item.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = Queue._time() + timeout
while not self._qsize():
remaining = endtime - Queue._time()
if remaining <= 0.0:
raise Queue.Empty
self.not_empty.wait(remaining)
while self.queue[0][0] != self.smallest:
remaining = endtime - Queue._time()
if remaining <= 0.0:
raise Queue.Empty
self.not_next_item.wait(remaining)
item = self._get()
self.smallest += 1
self.not_full.notify()
return item
finally:
self.not_empty.release()
class Tasks(list):
def add(self, task):
for t in self:
if not t.isValid():
self.remove(t)
if isinstance(task, list):
self += task
else:
self.append(task)
def cancel(self):
while self:
self.pop().cancel()
class Task(object):
def __init__(self, priority=None):
self.priority = priority
self._canceled = False
self.finished = False
def __cmp__(self, other):
return self.priority - other.priority
def start(self):
BGThreader.addTask(self)
def _run(self):
self.run()
self.finished = True
def run(self):
raise NotImplementedError
def cancel(self):
self._canceled = True
def should_cancel(self):
return self._canceled or xbmc.abortRequested
def isValid(self):
return not self.finished and not self._canceled
class FunctionAsTask(Task):
def __init__(self, function, callback, *args, **kwargs):
self._function = function
self._callback = callback
self._args = args
self._kwargs = kwargs
super(FunctionAsTask, self).__init__()
def run(self):
result = self._function(*self._args, **self._kwargs)
if self._callback:
self._callback(result)
class MutablePriorityQueue(Queue.PriorityQueue):
def _get(self, heappop=heapq.heappop):
self.queue.sort()
return heappop(self.queue)
def lowest(self):
"""Return the lowest priority item in the queue (not reliable!)."""
self.mutex.acquire()
try:
lowest = self.queue and min(self.queue) or None
except Exception:
lowest = None
utils.ERROR()
finally:
self.mutex.release()
return lowest
class BackgroundWorker(object):
def __init__(self, queue, name=None):
self._queue = queue
self.name = name
self._thread = None
self._abort = False
self._task = None
@staticmethod
def _runTask(task):
if task._canceled:
return
try:
task._run()
except Exception:
utils.ERROR()
def abort(self):
self._abort = True
return self
def aborted(self):
return self._abort or xbmc.abortRequested
def start(self):
if self._thread and self._thread.isAlive():
return
self._thread = KillableThread(target=self._queueLoop, name='BACKGROUND-WORKER({0})'.format(self.name))
self._thread.start()
def _queueLoop(self):
if self._queue.empty():
return
LOG.debug('(%s): Active', self.name)
try:
while not self.aborted():
self._task = self._queue.get_nowait()
self._runTask(self._task)
self._queue.task_done()
self._task = None
except Queue.Empty:
LOG.debug('(%s): Idle', self.name)
def shutdown(self):
self.abort()
if self._task:
self._task.cancel()
if self._thread and self._thread.isAlive():
LOG.debug('thread (%s): Waiting...', self.name)
self._thread.join()
LOG.debug('thread (%s): Done', self.name)
def working(self):
return self._thread and self._thread.isAlive()
class NonstoppingBackgroundWorker(BackgroundWorker):
def __init__(self, queue, name=None):
self._working = False
super(NonstoppingBackgroundWorker, self).__init__(queue, name)
def _queueLoop(self):
while not self.aborted():
try:
self._task = self._queue.get_nowait()
self._working = True
self._runTask(self._task)
self._working = False
self._queue.task_done()
self._task = None
except Queue.Empty:
app.APP.monitor.waitForAbort(0.05)
def working(self):
return self._working
class BackgroundThreader:
def __init__(self, name=None, worker=BackgroundWorker, worker_count=6):
self.name = name
self._queue = MutablePriorityQueue()
self._abort = False
self.priority = -1
self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)]
def _nextPriority(self):
self.priority += 1
return self.priority
def abort(self):
self._abort = True
for w in self.workers:
w.abort()
return self
def aborted(self):
return self._abort or xbmc.abortRequested
def shutdown(self):
self.abort()
for w in self.workers:
w.shutdown()
def addTask(self, task):
task.priority = self._nextPriority()
self._queue.put(task)
self.startWorkers()
def addTasks(self, tasks):
for t in tasks:
t.priority = self._nextPriority()
self._queue.put(t)
self.startWorkers()
def addTasksToFront(self, tasks):
lowest = self.getLowestPrority()
if lowest is None:
return self.addTasks(tasks)
p = lowest - len(tasks)
for t in tasks:
t.priority = p
self._queue.put(t)
p += 1
self.startWorkers()
def startWorkers(self):
for w in self.workers:
w.start()
def working(self):
return not self._queue.empty() or self.hasTask()
def hasTask(self):
return any([w.working() for w in self.workers])
def getLowestPrority(self):
lowest = self._queue.lowest()
if not lowest:
return None
return lowest.priority
def moveToFront(self, qitem):
lowest = self.getLowestPrority()
if lowest is None:
return
qitem.priority = lowest - 1
class ThreaderManager:
def __init__(self, worker=BackgroundWorker, worker_count=6):
self.index = 0
self.abandoned = []
self._workerhandler = worker
self.threader = BackgroundThreader(name=str(self.index),
worker=worker,
worker_count=worker_count)
def __getattr__(self, name):
return getattr(self.threader, name)
def reset(self):
if self.threader._queue.empty() and not self.threader.hasTask():
return
self.index += 1
self.abandoned.append(self.threader.abort())
self.threader = BackgroundThreader(name=str(self.index),
worker=self._workerhandler)
def shutdown(self):
self.threader.shutdown()
for a in self.abandoned:
a.shutdown()
BGThreader = ThreaderManager()