Use a dedicated full sync thread manager
- Prevents threads from restarting all the time
This commit is contained in:
parent
7180595e05
commit
3123af6426
2 changed files with 27 additions and 7 deletions
|
@ -94,7 +94,7 @@ class Tasks(list):
|
||||||
self.pop().cancel()
|
self.pop().cancel()
|
||||||
|
|
||||||
|
|
||||||
class Task:
|
class Task(object):
|
||||||
def __init__(self, priority=None):
|
def __init__(self, priority=None):
|
||||||
self._priority = priority
|
self._priority = priority
|
||||||
self._canceled = False
|
self._canceled = False
|
||||||
|
@ -141,7 +141,7 @@ class MutablePriorityQueue(Queue.PriorityQueue):
|
||||||
return lowest
|
return lowest
|
||||||
|
|
||||||
|
|
||||||
class BackgroundWorker:
|
class BackgroundWorker(object):
|
||||||
def __init__(self, queue, name=None):
|
def __init__(self, queue, name=None):
|
||||||
self._queue = queue
|
self._queue = queue
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -200,14 +200,30 @@ class BackgroundWorker:
|
||||||
return self._thread and self._thread.isAlive()
|
return self._thread and self._thread.isAlive()
|
||||||
|
|
||||||
|
|
||||||
|
class NonstoppingBackgroundWorker(BackgroundWorker):
|
||||||
|
def _queueLoop(self):
|
||||||
|
if self._queue.empty():
|
||||||
|
return
|
||||||
|
|
||||||
|
LOG.debug('(%s): Active', self.name)
|
||||||
|
while not self.aborted():
|
||||||
|
try:
|
||||||
|
self._task = self._queue.get_nowait()
|
||||||
|
self._runTask(self._task)
|
||||||
|
self._queue.task_done()
|
||||||
|
self._task = None
|
||||||
|
except Queue.Empty:
|
||||||
|
xbmc.sleep(50)
|
||||||
|
|
||||||
|
|
||||||
class BackgroundThreader:
|
class BackgroundThreader:
|
||||||
def __init__(self, name=None,
|
def __init__(self, name=None, worker,
|
||||||
worker_count=int(utils.settings('syncThreadNumber'))):
|
worker_count=int(utils.settings('syncThreadNumber'))):
|
||||||
self.name = name
|
self.name = name
|
||||||
self._queue = MutablePriorityQueue()
|
self._queue = MutablePriorityQueue()
|
||||||
self._abort = False
|
self._abort = False
|
||||||
self._priority = -1
|
self._priority = -1
|
||||||
self.workers = [BackgroundWorker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)]
|
self.workers = [worker(self._queue, 'queue.{0}:worker.{1}'.format(self.name, x)) for x in range(worker_count)]
|
||||||
|
|
||||||
def _nextPriority(self):
|
def _nextPriority(self):
|
||||||
self._priority += 1
|
self._priority += 1
|
||||||
|
@ -279,10 +295,11 @@ class BackgroundThreader:
|
||||||
|
|
||||||
|
|
||||||
class ThreaderManager:
|
class ThreaderManager:
|
||||||
def __init__(self):
|
def __init__(self, worker=BackgroundWorker):
|
||||||
self.index = 0
|
self.index = 0
|
||||||
self.abandoned = []
|
self.abandoned = []
|
||||||
self.threader = BackgroundThreader(str(self.index))
|
self.threader = BackgroundThreader(name=str(self.index),
|
||||||
|
worker=worker)
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
return getattr(self.threader, name)
|
return getattr(self.threader, name)
|
||||||
|
|
|
@ -37,6 +37,8 @@ class FullSync(common.libsync_mixin):
|
||||||
self.section_type = None
|
self.section_type = None
|
||||||
self.processing_thread = None
|
self.processing_thread = None
|
||||||
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
||||||
|
self.threader = backgroundthread.ThreaderManager(
|
||||||
|
worker=backgroundthread.NonstoppingBackgroundWorker)
|
||||||
super(FullSync, self).__init__()
|
super(FullSync, self).__init__()
|
||||||
|
|
||||||
def process_item(self, xml_item):
|
def process_item(self, xml_item):
|
||||||
|
@ -54,7 +56,7 @@ class FullSync(common.libsync_mixin):
|
||||||
return
|
return
|
||||||
task = GetMetadataTask()
|
task = GetMetadataTask()
|
||||||
task.setup(self.queue, plex_id, self.get_children)
|
task.setup(self.queue, plex_id, self.get_children)
|
||||||
backgroundthread.BGThreader.addTask(task)
|
self.threader.addTask(task)
|
||||||
|
|
||||||
def process_delete(self):
|
def process_delete(self):
|
||||||
"""
|
"""
|
||||||
|
@ -206,6 +208,7 @@ class FullSync(common.libsync_mixin):
|
||||||
LOG.debug('Waiting for processing thread to exit')
|
LOG.debug('Waiting for processing thread to exit')
|
||||||
self.processing_thread.join()
|
self.processing_thread.join()
|
||||||
common.update_kodi_library(video=True, music=True)
|
common.update_kodi_library(video=True, music=True)
|
||||||
|
self.threader.shutdown()
|
||||||
if self.callback:
|
if self.callback:
|
||||||
self.callback(successful)
|
self.callback(successful)
|
||||||
LOG.info('Done full_sync')
|
LOG.info('Done full_sync')
|
||||||
|
|
Loading…
Reference in a new issue