PlexKodiConnect/resources/lib/backgroundthread.py

467 lines
13 KiB
Python
Raw Normal View History

2018-09-10 20:53:46 +02:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
import threading
import Queue
import heapq
import xbmc
from . import utils, app
2018-09-10 20:53:46 +02:00
2018-12-24 13:19:40 +01:00
LOG = getLogger('PLEX.threads')
2018-09-10 20:53:46 +02:00
class KillableThread(threading.Thread):
'''A thread class that supports raising exception in the thread from
another thread.
'''
# def _get_my_tid(self):
# """determines this (self's) thread id
# CAREFUL : this function is executed in the context of the caller
# thread, to get the identity of the thread represented by this
# instance.
# """
# if not self.isAlive():
# raise threading.ThreadError("the thread is not active")
# return self.ident
# def _raiseExc(self, exctype):
# """Raises the given exception type in the context of this thread.
# If the thread is busy in a system call (time.sleep(),
# socket.accept(), ...), the exception is simply ignored.
# If you are sure that your exception should terminate the thread,
# one way to ensure that it works is:
# t = ThreadWithExc( ... )
# ...
# t.raiseExc( SomeException )
# while t.isAlive():
# time.sleep( 0.1 )
# t.raiseExc( SomeException )
# If the exception is to be caught by the thread, you need a way to
# check that your thread has caught it.
# CAREFUL : this function is executed in the context of the
# caller thread, to raise an excpetion in the context of the
# thread represented by this instance.
# """
# _async_raise(self._get_my_tid(), exctype)
def kill(self, force_and_wait=False):
pass
# try:
# self._raiseExc(KillThreadException)
# if force_and_wait:
# time.sleep(0.1)
# while self.isAlive():
# self._raiseExc(KillThreadException)
# time.sleep(0.1)
# except threading.ThreadError:
# pass
# def onKilled(self):
# pass
# def run(self):
# try:
# self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
# except KillThreadException:
# self.onKilled()
2018-11-18 14:59:17 +01:00
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
self._canceled = False
# Set to True to set the thread to suspended
2018-11-18 14:59:17 +01:00
self._suspended = False
# Thread will return True only if suspended state is reached
self.suspend_reached = False
2018-11-18 14:59:17 +01:00
super(KillableThread, self).__init__(group, target, name, args, kwargs)
def isCanceled(self):
"""
Returns True if the thread is stopped
"""
if self._canceled or xbmc.abortRequested:
return True
return False
def abort(self):
"""
Call to stop this thread
"""
self._canceled = True
def suspend(self, block=False):
2018-11-18 14:59:17 +01:00
"""
Call to suspend this thread
"""
self._suspended = True
if block:
while not self.suspend_reached:
LOG.debug('Waiting for thread to suspend: %s', self)
if app.APP.monitor.waitForAbort(0.1):
return
2018-11-18 14:59:17 +01:00
def resume(self):
"""
Call to revive a suspended thread back to life
"""
self._suspended = False
def wait_while_suspended(self):
"""
Blocks until thread is not suspended anymore or the thread should
exit.
Returns True only if the thread should exit (=isCanceled())
"""
while self.isSuspended():
try:
self.suspend_reached = True
# Set in service.py
if self.isCanceled():
# Abort was requested while waiting. We should exit
return True
if app.APP.monitor.waitForAbort(0.1):
return True
finally:
self.suspend_reached = False
return self.isCanceled()
2018-11-18 14:59:17 +01:00
def isSuspended(self):
"""
Returns True if the thread is suspended
"""
2018-12-09 09:14:45 +01:00
return self._suspended
2018-11-18 14:59:17 +01:00
2018-09-10 20:53:46 +02:00
class OrderedQueue(Queue.PriorityQueue, object):
"""
Queue that enforces an order on the items it returns. An item you push
onto the queue must be a tuple
(index, item)
where index=-1 is the item that will be returned first. The Queue will block
until index=-1, 0, 1, 2, 3, ... is then made available
"""
def __init__(self, maxsize=0):
super(OrderedQueue, self).__init__(maxsize)
self.smallest = -1
self.not_next_item = threading.Condition(self.mutex)
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
if item[0] == self.smallest:
self.not_next_item.notify()
def get(self, block=True, timeout=None):
"""Remove and return an item from the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize() or self.queue[0][0] != self.smallest:
raise Queue.Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
while self.queue[0][0] != self.smallest:
self.not_next_item.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = Queue._time() + timeout
while not self._qsize():
remaining = endtime - Queue._time()
if remaining <= 0.0:
raise Queue.Empty
self.not_empty.wait(remaining)
while self.queue[0][0] != self.smallest:
remaining = endtime - Queue._time()
if remaining <= 0.0:
raise Queue.Empty
self.not_next_item.wait(remaining)
item = self._get()
self.smallest += 1
self.not_full.notify()
return item
finally:
self.not_empty.release()
2018-09-10 20:53:46 +02:00
class Tasks(list):
def add(self, task):
for t in self:
if not t.isValid():
self.remove(t)
if isinstance(task, list):
self += task
else:
self.append(task)
def cancel(self):
while self:
self.pop().cancel()
class Task(object):
2018-09-10 20:53:46 +02:00
def __init__(self, priority=None):
2019-02-02 20:22:06 +01:00
self.priority = priority
2018-09-10 20:53:46 +02:00
self._canceled = False
self.finished = False
def __cmp__(self, other):
2019-02-02 20:22:06 +01:00
return self.priority - other.priority
2018-09-10 20:53:46 +02:00
def start(self):
BGThreader.addTask(self)
def _run(self):
self.run()
self.finished = True
def run(self):
2018-11-24 09:52:36 +01:00
raise NotImplementedError
2018-09-10 20:53:46 +02:00
def cancel(self):
self._canceled = True
def isCanceled(self):
return self._canceled or xbmc.abortRequested
def isValid(self):
return not self.finished and not self._canceled
class FunctionAsTask(Task):
def __init__(self, function, callback, *args, **kwargs):
self._function = function
self._callback = callback
2018-11-24 09:53:37 +01:00
self._args = args
self._kwargs = kwargs
super(FunctionAsTask, self).__init__()
def run(self):
result = self._function(*self._args, **self._kwargs)
if self._callback:
self._callback(result)
2018-11-24 09:53:37 +01:00
2018-09-10 20:53:46 +02:00
class MutablePriorityQueue(Queue.PriorityQueue):
def _get(self, heappop=heapq.heappop):
self.queue.sort()
return heappop(self.queue)
def lowest(self):
"""Return the lowest priority item in the queue (not reliable!)."""
self.mutex.acquire()
try:
lowest = self.queue and min(self.queue) or None
2019-02-02 20:22:06 +01:00
except Exception:
2018-09-10 20:53:46 +02:00
lowest = None
utils.ERROR()
finally:
self.mutex.release()
return lowest
class BackgroundWorker(object):
2018-09-10 20:53:46 +02:00
def __init__(self, queue, name=None):
self._queue = queue
self.name = name
self._thread = None
self._abort = False
self._task = None
2019-02-08 13:52:33 +01:00
@staticmethod
def _runTask(task):
2018-09-10 20:53:46 +02:00
if task._canceled:
return
try:
task._run()
2019-02-02 20:22:06 +01:00
except Exception:
2018-09-10 20:53:46 +02:00
utils.ERROR()
def abort(self):
self._abort = True
return self
def aborted(self):
return self._abort or xbmc.abortRequested
def start(self):
if self._thread and self._thread.isAlive():
return
self._thread = KillableThread(target=self._queueLoop, name='BACKGROUND-WORKER({0})'.format(self.name))
self._thread.start()
def _queueLoop(self):
if self._queue.empty():
return
LOG.debug('(%s): Active', self.name)
try:
while not self.aborted():
self._task = self._queue.get_nowait()
self._runTask(self._task)
self._queue.task_done()
self._task = None
except Queue.Empty:
LOG.debug('(%s): Idle', self.name)
def shutdown(self):
self.abort()
if self._task:
self._task.cancel()
if self._thread and self._thread.isAlive():
LOG.debug('thread (%s): Waiting...', self.name)
self._thread.join()
LOG.debug('thread (%s): Done', self.name)
def working(self):
return self._thread and self._thread.isAlive()
class NonstoppingBackgroundWorker(BackgroundWorker):
def __init__(self, queue, name=None):
self._working = False
super(NonstoppingBackgroundWorker, self).__init__(queue, name)
def _queueLoop(self):
while not self.aborted():
try:
self._task = self._queue.get_nowait()
self._working = True
self._runTask(self._task)
self._working = False
self._queue.task_done()
self._task = None
except Queue.Empty:
app.APP.monitor.waitForAbort(0.05)
def working(self):
return self._working
2018-09-10 20:53:46 +02:00
class BackgroundThreader:
2018-11-24 09:52:25 +01:00
def __init__(self, name=None, worker=BackgroundWorker, worker_count=6):
2018-09-10 20:53:46 +02:00
self.name = name
self._queue = MutablePriorityQueue()
self._abort = False
2019-02-02 20:22:06 +01:00
self.priority = -1
self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)]
2018-09-10 20:53:46 +02:00
def _nextPriority(self):
2019-02-02 20:22:06 +01:00
self.priority += 1
return self.priority
2018-09-10 20:53:46 +02:00
def abort(self):
self._abort = True
for w in self.workers:
w.abort()
return self
def aborted(self):
return self._abort or xbmc.abortRequested
def shutdown(self):
self.abort()
for w in self.workers:
w.shutdown()
def addTask(self, task):
2019-02-02 20:22:06 +01:00
task.priority = self._nextPriority()
2018-09-10 20:53:46 +02:00
self._queue.put(task)
self.startWorkers()
def addTasks(self, tasks):
for t in tasks:
2019-02-02 20:22:06 +01:00
t.priority = self._nextPriority()
2018-09-10 20:53:46 +02:00
self._queue.put(t)
self.startWorkers()
def addTasksToFront(self, tasks):
lowest = self.getLowestPrority()
if lowest is None:
return self.addTasks(tasks)
p = lowest - len(tasks)
for t in tasks:
2019-02-02 20:22:06 +01:00
t.priority = p
2018-09-10 20:53:46 +02:00
self._queue.put(t)
p += 1
self.startWorkers()
def startWorkers(self):
for w in self.workers:
w.start()
def working(self):
return not self._queue.empty() or self.hasTask()
def hasTask(self):
return any([w.working() for w in self.workers])
def getLowestPrority(self):
lowest = self._queue.lowest()
if not lowest:
return None
2019-02-02 20:22:06 +01:00
return lowest.priority
2018-09-10 20:53:46 +02:00
def moveToFront(self, qitem):
lowest = self.getLowestPrority()
if lowest is None:
return
2019-02-02 20:22:06 +01:00
qitem.priority = lowest - 1
2018-09-10 20:53:46 +02:00
class ThreaderManager:
2018-12-09 09:14:45 +01:00
def __init__(self, worker=BackgroundWorker, worker_count=6):
2018-09-10 20:53:46 +02:00
self.index = 0
self.abandoned = []
2018-11-06 11:20:20 +01:00
self._workerhandler = worker
2018-12-09 09:14:45 +01:00
self.threader = BackgroundThreader(name=str(self.index),
worker=worker,
worker_count=worker_count)
2018-09-10 20:53:46 +02:00
def __getattr__(self, name):
return getattr(self.threader, name)
def reset(self):
if self.threader._queue.empty() and not self.threader.hasTask():
return
self.index += 1
self.abandoned.append(self.threader.abort())
2018-11-06 11:20:20 +01:00
self.threader = BackgroundThreader(name=str(self.index),
worker=self._workerhandler)
2018-09-10 20:53:46 +02:00
def shutdown(self):
self.threader.shutdown()
for a in self.abandoned:
a.shutdown()
BGThreader = ThreaderManager()