466 lines
13 KiB
Python
466 lines
13 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
from __future__ import absolute_import, division, unicode_literals
|
|
from logging import getLogger
|
|
import threading
|
|
import Queue
|
|
import heapq
|
|
import xbmc
|
|
|
|
from . import utils, app
|
|
|
|
LOG = getLogger('PLEX.threads')
|
|
|
|
|
|
class KillableThread(threading.Thread):
|
|
'''A thread class that supports raising exception in the thread from
|
|
another thread.
|
|
'''
|
|
# def _get_my_tid(self):
|
|
# """determines this (self's) thread id
|
|
|
|
# CAREFUL : this function is executed in the context of the caller
|
|
# thread, to get the identity of the thread represented by this
|
|
# instance.
|
|
# """
|
|
# if not self.isAlive():
|
|
# raise threading.ThreadError("the thread is not active")
|
|
|
|
# return self.ident
|
|
|
|
# def _raiseExc(self, exctype):
|
|
# """Raises the given exception type in the context of this thread.
|
|
|
|
# If the thread is busy in a system call (time.sleep(),
|
|
# socket.accept(), ...), the exception is simply ignored.
|
|
|
|
# If you are sure that your exception should terminate the thread,
|
|
# one way to ensure that it works is:
|
|
|
|
# t = ThreadWithExc( ... )
|
|
# ...
|
|
# t.raiseExc( SomeException )
|
|
# while t.isAlive():
|
|
# time.sleep( 0.1 )
|
|
# t.raiseExc( SomeException )
|
|
|
|
# If the exception is to be caught by the thread, you need a way to
|
|
# check that your thread has caught it.
|
|
|
|
# CAREFUL : this function is executed in the context of the
|
|
# caller thread, to raise an excpetion in the context of the
|
|
# thread represented by this instance.
|
|
# """
|
|
# _async_raise(self._get_my_tid(), exctype)
|
|
|
|
def kill(self, force_and_wait=False):
|
|
pass
|
|
# try:
|
|
# self._raiseExc(KillThreadException)
|
|
|
|
# if force_and_wait:
|
|
# time.sleep(0.1)
|
|
# while self.isAlive():
|
|
# self._raiseExc(KillThreadException)
|
|
# time.sleep(0.1)
|
|
# except threading.ThreadError:
|
|
# pass
|
|
|
|
# def onKilled(self):
|
|
# pass
|
|
|
|
# def run(self):
|
|
# try:
|
|
# self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
|
|
# except KillThreadException:
|
|
# self.onKilled()
|
|
|
|
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
|
|
self._canceled = False
|
|
# Set to True to set the thread to suspended
|
|
self._suspended = False
|
|
# Thread will return True only if suspended state is reached
|
|
self.suspend_reached = False
|
|
super(KillableThread, self).__init__(group, target, name, args, kwargs)
|
|
|
|
def isCanceled(self):
|
|
"""
|
|
Returns True if the thread is stopped
|
|
"""
|
|
if self._canceled or xbmc.abortRequested:
|
|
return True
|
|
return False
|
|
|
|
def abort(self):
|
|
"""
|
|
Call to stop this thread
|
|
"""
|
|
self._canceled = True
|
|
|
|
def suspend(self, block=False):
|
|
"""
|
|
Call to suspend this thread
|
|
"""
|
|
self._suspended = True
|
|
if block:
|
|
while not self.suspend_reached:
|
|
LOG.debug('Waiting for thread to suspend: %s', self)
|
|
if app.APP.monitor.waitForAbort(0.1):
|
|
return
|
|
|
|
def resume(self):
|
|
"""
|
|
Call to revive a suspended thread back to life
|
|
"""
|
|
self._suspended = False
|
|
|
|
def wait_while_suspended(self):
|
|
"""
|
|
Blocks until thread is not suspended anymore or the thread should
|
|
exit.
|
|
Returns True only if the thread should exit (=isCanceled())
|
|
"""
|
|
while self.isSuspended():
|
|
try:
|
|
self.suspend_reached = True
|
|
# Set in service.py
|
|
if self.isCanceled():
|
|
# Abort was requested while waiting. We should exit
|
|
return True
|
|
if app.APP.monitor.waitForAbort(0.1):
|
|
return True
|
|
finally:
|
|
self.suspend_reached = False
|
|
return self.isCanceled()
|
|
|
|
def isSuspended(self):
|
|
"""
|
|
Returns True if the thread is suspended
|
|
"""
|
|
return self._suspended
|
|
|
|
|
|
class OrderedQueue(Queue.PriorityQueue, object):
|
|
"""
|
|
Queue that enforces an order on the items it returns. An item you push
|
|
onto the queue must be a tuple
|
|
(index, item)
|
|
where index=-1 is the item that will be returned first. The Queue will block
|
|
until index=-1, 0, 1, 2, 3, ... is then made available
|
|
"""
|
|
def __init__(self, maxsize=0):
|
|
super(OrderedQueue, self).__init__(maxsize)
|
|
self.smallest = -1
|
|
self.not_next_item = threading.Condition(self.mutex)
|
|
|
|
def _put(self, item, heappush=heapq.heappush):
|
|
heappush(self.queue, item)
|
|
if item[0] == self.smallest:
|
|
self.not_next_item.notify()
|
|
|
|
def get(self, block=True, timeout=None):
|
|
"""Remove and return an item from the queue.
|
|
|
|
If optional args 'block' is true and 'timeout' is None (the default),
|
|
block if necessary until an item is available. If 'timeout' is
|
|
a non-negative number, it blocks at most 'timeout' seconds and raises
|
|
the Empty exception if no item was available within that time.
|
|
Otherwise ('block' is false), return an item if one is immediately
|
|
available, else raise the Empty exception ('timeout' is ignored
|
|
in that case).
|
|
"""
|
|
self.not_empty.acquire()
|
|
try:
|
|
if not block:
|
|
if not self._qsize() or self.queue[0][0] != self.smallest:
|
|
raise Queue.Empty
|
|
elif timeout is None:
|
|
while not self._qsize():
|
|
self.not_empty.wait()
|
|
while self.queue[0][0] != self.smallest:
|
|
self.not_next_item.wait()
|
|
elif timeout < 0:
|
|
raise ValueError("'timeout' must be a non-negative number")
|
|
else:
|
|
endtime = Queue._time() + timeout
|
|
while not self._qsize():
|
|
remaining = endtime - Queue._time()
|
|
if remaining <= 0.0:
|
|
raise Queue.Empty
|
|
self.not_empty.wait(remaining)
|
|
while self.queue[0][0] != self.smallest:
|
|
remaining = endtime - Queue._time()
|
|
if remaining <= 0.0:
|
|
raise Queue.Empty
|
|
self.not_next_item.wait(remaining)
|
|
item = self._get()
|
|
self.smallest += 1
|
|
self.not_full.notify()
|
|
return item
|
|
finally:
|
|
self.not_empty.release()
|
|
|
|
|
|
class Tasks(list):
|
|
def add(self, task):
|
|
for t in self:
|
|
if not t.isValid():
|
|
self.remove(t)
|
|
|
|
if isinstance(task, list):
|
|
self += task
|
|
else:
|
|
self.append(task)
|
|
|
|
def cancel(self):
|
|
while self:
|
|
self.pop().cancel()
|
|
|
|
|
|
class Task(object):
|
|
def __init__(self, priority=None):
|
|
self.priority = priority
|
|
self._canceled = False
|
|
self.finished = False
|
|
|
|
def __cmp__(self, other):
|
|
return self.priority - other.priority
|
|
|
|
def start(self):
|
|
BGThreader.addTask(self)
|
|
|
|
def _run(self):
|
|
self.run()
|
|
self.finished = True
|
|
|
|
def run(self):
|
|
raise NotImplementedError
|
|
|
|
def cancel(self):
|
|
self._canceled = True
|
|
|
|
def isCanceled(self):
|
|
return self._canceled or xbmc.abortRequested
|
|
|
|
def isValid(self):
|
|
return not self.finished and not self._canceled
|
|
|
|
|
|
class FunctionAsTask(Task):
|
|
def __init__(self, function, callback, *args, **kwargs):
|
|
self._function = function
|
|
self._callback = callback
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
super(FunctionAsTask, self).__init__()
|
|
|
|
def run(self):
|
|
result = self._function(*self._args, **self._kwargs)
|
|
if self._callback:
|
|
self._callback(result)
|
|
|
|
|
|
class MutablePriorityQueue(Queue.PriorityQueue):
|
|
def _get(self, heappop=heapq.heappop):
|
|
self.queue.sort()
|
|
return heappop(self.queue)
|
|
|
|
def lowest(self):
|
|
"""Return the lowest priority item in the queue (not reliable!)."""
|
|
self.mutex.acquire()
|
|
try:
|
|
lowest = self.queue and min(self.queue) or None
|
|
except Exception:
|
|
lowest = None
|
|
utils.ERROR()
|
|
finally:
|
|
self.mutex.release()
|
|
return lowest
|
|
|
|
|
|
class BackgroundWorker(object):
|
|
def __init__(self, queue, name=None):
|
|
self._queue = queue
|
|
self.name = name
|
|
self._thread = None
|
|
self._abort = False
|
|
self._task = None
|
|
|
|
@staticmethod
|
|
def _runTask(task):
|
|
if task._canceled:
|
|
return
|
|
try:
|
|
task._run()
|
|
except Exception:
|
|
utils.ERROR()
|
|
|
|
def abort(self):
|
|
self._abort = True
|
|
return self
|
|
|
|
def aborted(self):
|
|
return self._abort or xbmc.abortRequested
|
|
|
|
def start(self):
|
|
if self._thread and self._thread.isAlive():
|
|
return
|
|
|
|
self._thread = KillableThread(target=self._queueLoop, name='BACKGROUND-WORKER({0})'.format(self.name))
|
|
self._thread.start()
|
|
|
|
def _queueLoop(self):
|
|
if self._queue.empty():
|
|
return
|
|
|
|
LOG.debug('(%s): Active', self.name)
|
|
try:
|
|
while not self.aborted():
|
|
self._task = self._queue.get_nowait()
|
|
self._runTask(self._task)
|
|
self._queue.task_done()
|
|
self._task = None
|
|
except Queue.Empty:
|
|
LOG.debug('(%s): Idle', self.name)
|
|
|
|
def shutdown(self):
|
|
self.abort()
|
|
|
|
if self._task:
|
|
self._task.cancel()
|
|
|
|
if self._thread and self._thread.isAlive():
|
|
LOG.debug('thread (%s): Waiting...', self.name)
|
|
self._thread.join()
|
|
LOG.debug('thread (%s): Done', self.name)
|
|
|
|
def working(self):
|
|
return self._thread and self._thread.isAlive()
|
|
|
|
|
|
class NonstoppingBackgroundWorker(BackgroundWorker):
|
|
def __init__(self, queue, name=None):
|
|
self._working = False
|
|
super(NonstoppingBackgroundWorker, self).__init__(queue, name)
|
|
|
|
def _queueLoop(self):
|
|
while not self.aborted():
|
|
try:
|
|
self._task = self._queue.get_nowait()
|
|
self._working = True
|
|
self._runTask(self._task)
|
|
self._working = False
|
|
self._queue.task_done()
|
|
self._task = None
|
|
except Queue.Empty:
|
|
app.APP.monitor.waitForAbort(0.05)
|
|
|
|
def working(self):
|
|
return self._working
|
|
|
|
|
|
class BackgroundThreader:
|
|
def __init__(self, name=None, worker=BackgroundWorker, worker_count=6):
|
|
self.name = name
|
|
self._queue = MutablePriorityQueue()
|
|
self._abort = False
|
|
self.priority = -1
|
|
self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)]
|
|
|
|
def _nextPriority(self):
|
|
self.priority += 1
|
|
return self.priority
|
|
|
|
def abort(self):
|
|
self._abort = True
|
|
for w in self.workers:
|
|
w.abort()
|
|
return self
|
|
|
|
def aborted(self):
|
|
return self._abort or xbmc.abortRequested
|
|
|
|
def shutdown(self):
|
|
self.abort()
|
|
|
|
for w in self.workers:
|
|
w.shutdown()
|
|
|
|
def addTask(self, task):
|
|
task.priority = self._nextPriority()
|
|
self._queue.put(task)
|
|
self.startWorkers()
|
|
|
|
def addTasks(self, tasks):
|
|
for t in tasks:
|
|
t.priority = self._nextPriority()
|
|
self._queue.put(t)
|
|
|
|
self.startWorkers()
|
|
|
|
def addTasksToFront(self, tasks):
|
|
lowest = self.getLowestPrority()
|
|
if lowest is None:
|
|
return self.addTasks(tasks)
|
|
|
|
p = lowest - len(tasks)
|
|
for t in tasks:
|
|
t.priority = p
|
|
self._queue.put(t)
|
|
p += 1
|
|
|
|
self.startWorkers()
|
|
|
|
def startWorkers(self):
|
|
for w in self.workers:
|
|
w.start()
|
|
|
|
def working(self):
|
|
return not self._queue.empty() or self.hasTask()
|
|
|
|
def hasTask(self):
|
|
return any([w.working() for w in self.workers])
|
|
|
|
def getLowestPrority(self):
|
|
lowest = self._queue.lowest()
|
|
if not lowest:
|
|
return None
|
|
|
|
return lowest.priority
|
|
|
|
def moveToFront(self, qitem):
|
|
lowest = self.getLowestPrority()
|
|
if lowest is None:
|
|
return
|
|
|
|
qitem.priority = lowest - 1
|
|
|
|
|
|
class ThreaderManager:
|
|
def __init__(self, worker=BackgroundWorker, worker_count=6):
|
|
self.index = 0
|
|
self.abandoned = []
|
|
self._workerhandler = worker
|
|
self.threader = BackgroundThreader(name=str(self.index),
|
|
worker=worker,
|
|
worker_count=worker_count)
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.threader, name)
|
|
|
|
def reset(self):
|
|
if self.threader._queue.empty() and not self.threader.hasTask():
|
|
return
|
|
|
|
self.index += 1
|
|
self.abandoned.append(self.threader.abort())
|
|
self.threader = BackgroundThreader(name=str(self.index),
|
|
worker=self._workerhandler)
|
|
|
|
def shutdown(self):
|
|
self.threader.shutdown()
|
|
for a in self.abandoned:
|
|
a.shutdown()
|
|
|
|
|
|
BGThreader = ThreaderManager()
|