Merge pull request #1073 from croneter/fix-userswitch
Optimize threads by using events instead of a polling mechanism. Fixes PKC becoming unresponsive, e.g. when switching users
This commit is contained in:
commit
f4ea051c81
15 changed files with 194 additions and 243 deletions
|
@ -124,19 +124,16 @@ class App(object):
|
|||
if block:
|
||||
while True:
|
||||
for thread in self.threads:
|
||||
if not thread.suspend_reached:
|
||||
if not thread.is_suspended():
|
||||
LOG.debug('Waiting for thread to suspend: %s', thread)
|
||||
# Send suspend signal again in case self.threads
|
||||
# changed
|
||||
thread.suspend()
|
||||
if self.monitor.waitForAbort(0.1):
|
||||
return True
|
||||
break
|
||||
thread.suspend(block=True)
|
||||
else:
|
||||
break
|
||||
return xbmc.abortRequested
|
||||
|
||||
def resume_threads(self, block=True):
|
||||
def resume_threads(self):
|
||||
"""
|
||||
Resume all thread activity with or without blocking.
|
||||
Returns True only if PKC shutdown requested
|
||||
|
@ -144,16 +141,6 @@ class App(object):
|
|||
LOG.debug('Resuming threads: %s', self.threads)
|
||||
for thread in self.threads:
|
||||
thread.resume()
|
||||
if block:
|
||||
while True:
|
||||
for thread in self.threads:
|
||||
if thread.suspend_reached:
|
||||
LOG.debug('Waiting for thread to resume: %s', thread)
|
||||
if self.monitor.waitForAbort(0.1):
|
||||
return True
|
||||
break
|
||||
else:
|
||||
break
|
||||
return xbmc.abortRequested
|
||||
|
||||
def stop_threads(self, block=True):
|
||||
|
@ -163,7 +150,7 @@ class App(object):
|
|||
"""
|
||||
LOG.debug('Killing threads: %s', self.threads)
|
||||
for thread in self.threads:
|
||||
thread.abort()
|
||||
thread.cancel()
|
||||
if block:
|
||||
while self.threads:
|
||||
LOG.debug('Waiting for threads to exit: %s', self.threads)
|
||||
|
|
|
@ -33,8 +33,8 @@ class ImageCachingThread(backgroundthread.KillableThread):
|
|||
if not utils.settings('imageSyncDuringPlayback') == 'true':
|
||||
self.suspend_points.append((app.APP, 'is_playing_video'))
|
||||
|
||||
def isSuspended(self):
|
||||
return any(getattr(obj, txt) for obj, txt in self.suspend_points)
|
||||
def should_suspend(self):
|
||||
return any(getattr(obj, attrib) for obj, attrib in self.suspend_points)
|
||||
|
||||
@staticmethod
|
||||
def _url_generator(kind, kodi_type):
|
||||
|
@ -73,18 +73,26 @@ class ImageCachingThread(backgroundthread.KillableThread):
|
|||
app.APP.deregister_caching_thread(self)
|
||||
LOG.info("---===### Stopped ImageCachingThread ###===---")
|
||||
|
||||
def _run(self):
|
||||
def _loop(self):
|
||||
kinds = [KodiVideoDB]
|
||||
if app.SYNC.enable_music:
|
||||
kinds.append(KodiMusicDB)
|
||||
for kind in kinds:
|
||||
for kodi_type in ('poster', 'fanart'):
|
||||
for url in self._url_generator(kind, kodi_type):
|
||||
if self.wait_while_suspended():
|
||||
return
|
||||
if self.should_suspend() or self.should_cancel():
|
||||
return False
|
||||
cache_url(url)
|
||||
# Toggles Image caching completed to Yes
|
||||
utils.settings('plex_status_image_caching', value=utils.lang(107))
|
||||
return True
|
||||
|
||||
def _run(self):
|
||||
while True:
|
||||
if self._loop():
|
||||
break
|
||||
if self.wait_while_suspended():
|
||||
break
|
||||
|
||||
|
||||
def cache_url(url):
|
||||
|
|
|
@ -13,131 +13,95 @@ LOG = getLogger('PLEX.threads')
|
|||
|
||||
|
||||
class KillableThread(threading.Thread):
|
||||
'''A thread class that supports raising exception in the thread from
|
||||
another thread.
|
||||
'''
|
||||
# def _get_my_tid(self):
|
||||
# """determines this (self's) thread id
|
||||
|
||||
# CAREFUL : this function is executed in the context of the caller
|
||||
# thread, to get the identity of the thread represented by this
|
||||
# instance.
|
||||
# """
|
||||
# if not self.isAlive():
|
||||
# raise threading.ThreadError("the thread is not active")
|
||||
|
||||
# return self.ident
|
||||
|
||||
# def _raiseExc(self, exctype):
|
||||
# """Raises the given exception type in the context of this thread.
|
||||
|
||||
# If the thread is busy in a system call (time.sleep(),
|
||||
# socket.accept(), ...), the exception is simply ignored.
|
||||
|
||||
# If you are sure that your exception should terminate the thread,
|
||||
# one way to ensure that it works is:
|
||||
|
||||
# t = ThreadWithExc( ... )
|
||||
# ...
|
||||
# t.raiseExc( SomeException )
|
||||
# while t.isAlive():
|
||||
# time.sleep( 0.1 )
|
||||
# t.raiseExc( SomeException )
|
||||
|
||||
# If the exception is to be caught by the thread, you need a way to
|
||||
# check that your thread has caught it.
|
||||
|
||||
# CAREFUL : this function is executed in the context of the
|
||||
# caller thread, to raise an excpetion in the context of the
|
||||
# thread represented by this instance.
|
||||
# """
|
||||
# _async_raise(self._get_my_tid(), exctype)
|
||||
|
||||
def kill(self, force_and_wait=False):
|
||||
pass
|
||||
# try:
|
||||
# self._raiseExc(KillThreadException)
|
||||
|
||||
# if force_and_wait:
|
||||
# time.sleep(0.1)
|
||||
# while self.isAlive():
|
||||
# self._raiseExc(KillThreadException)
|
||||
# time.sleep(0.1)
|
||||
# except threading.ThreadError:
|
||||
# pass
|
||||
|
||||
# def onKilled(self):
|
||||
# pass
|
||||
|
||||
# def run(self):
|
||||
# try:
|
||||
# self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
|
||||
# except KillThreadException:
|
||||
# self.onKilled()
|
||||
|
||||
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
|
||||
self._canceled = False
|
||||
# Set to True to set the thread to suspended
|
||||
self._suspended = False
|
||||
# Thread will return True only if suspended state is reached
|
||||
self.suspend_reached = False
|
||||
self._is_not_suspended = threading.Event()
|
||||
self._is_not_suspended.set()
|
||||
self._suspension_reached = threading.Event()
|
||||
self._is_not_asleep = threading.Event()
|
||||
self._is_not_asleep.set()
|
||||
self.suspension_timeout = None
|
||||
super(KillableThread, self).__init__(group, target, name, args, kwargs)
|
||||
|
||||
def isCanceled(self):
|
||||
def should_cancel(self):
|
||||
"""
|
||||
Returns True if the thread is stopped
|
||||
Returns True if the thread should be stopped immediately
|
||||
"""
|
||||
if self._canceled or xbmc.abortRequested:
|
||||
return True
|
||||
return False
|
||||
return self._canceled or app.APP.stop_pkc
|
||||
|
||||
def abort(self):
|
||||
def cancel(self):
|
||||
"""
|
||||
Call to stop this thread
|
||||
Call from another thread to stop this current thread
|
||||
"""
|
||||
self._canceled = True
|
||||
# Make sure thread is running in order to exit quickly
|
||||
self._is_not_suspended.set()
|
||||
self._is_not_asleep.set()
|
||||
|
||||
def suspend(self, block=False):
|
||||
def should_suspend(self):
|
||||
"""
|
||||
Call to suspend this thread
|
||||
Returns True if the current thread should be suspended immediately
|
||||
"""
|
||||
return self._suspended
|
||||
|
||||
def suspend(self, block=False, timeout=None):
|
||||
"""
|
||||
Call from another thread to suspend the current thread. Provide a
|
||||
timeout [float] in seconds optionally. block=True will block the caller
|
||||
until the thread-to-be-suspended is indeed suspended
|
||||
Will wake a thread that is asleep!
|
||||
"""
|
||||
self.suspension_timeout = timeout
|
||||
self._suspended = True
|
||||
self._is_not_suspended.clear()
|
||||
# Make sure thread wakes up in order to suspend
|
||||
self._is_not_asleep.set()
|
||||
if block:
|
||||
while not self.suspend_reached:
|
||||
LOG.debug('Waiting for thread to suspend: %s', self)
|
||||
if app.APP.monitor.waitForAbort(0.1):
|
||||
return
|
||||
self._suspension_reached.wait()
|
||||
|
||||
def resume(self):
|
||||
"""
|
||||
Call to revive a suspended thread back to life
|
||||
Call from another thread to revive a suspended or asleep current thread
|
||||
back to life
|
||||
"""
|
||||
self._suspended = False
|
||||
self._is_not_suspended.set()
|
||||
self._is_not_asleep.set()
|
||||
|
||||
def wait_while_suspended(self):
|
||||
"""
|
||||
Blocks until thread is not suspended anymore or the thread should
|
||||
exit.
|
||||
Returns True only if the thread should exit (=isCanceled())
|
||||
exit or for a period of self.suspension_timeout (set by the caller of
|
||||
suspend())
|
||||
Returns the value of should_cancel()
|
||||
"""
|
||||
while self.isSuspended():
|
||||
try:
|
||||
self.suspend_reached = True
|
||||
# Set in service.py
|
||||
if self.isCanceled():
|
||||
# Abort was requested while waiting. We should exit
|
||||
return True
|
||||
if app.APP.monitor.waitForAbort(0.1):
|
||||
return True
|
||||
finally:
|
||||
self.suspend_reached = False
|
||||
return self.isCanceled()
|
||||
self._suspension_reached.set()
|
||||
self._is_not_suspended.wait(self.suspension_timeout)
|
||||
self._suspension_reached.clear()
|
||||
return self.should_cancel()
|
||||
|
||||
def isSuspended(self):
|
||||
def is_suspended(self):
|
||||
"""
|
||||
Returns True if the thread is suspended
|
||||
Check from another thread whether the current thread is suspended
|
||||
"""
|
||||
return self._suspended
|
||||
return self._suspension_reached.is_set()
|
||||
|
||||
def sleep(self, timeout):
|
||||
"""
|
||||
Only call from the current thread in order to sleep for a period of
|
||||
timeout [float, seconds]. Will unblock immediately if thread should
|
||||
cancel (should_cancel()) or the thread should_suspend
|
||||
"""
|
||||
self._is_not_asleep.clear()
|
||||
self._is_not_asleep.wait(timeout)
|
||||
self._is_not_asleep.set()
|
||||
|
||||
def is_asleep(self):
|
||||
"""
|
||||
Check from another thread whether the current thread is asleep
|
||||
"""
|
||||
return not self._is_not_asleep.is_set()
|
||||
|
||||
|
||||
class OrderedQueue(Queue.PriorityQueue, object):
|
||||
|
@ -239,7 +203,7 @@ class Task(object):
|
|||
def cancel(self):
|
||||
self._canceled = True
|
||||
|
||||
def isCanceled(self):
|
||||
def should_cancel(self):
|
||||
return self._canceled or xbmc.abortRequested
|
||||
|
||||
def isValid(self):
|
||||
|
|
|
@ -9,34 +9,6 @@ PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and
|
|||
utils.settings('enablePlaylistSync') == 'true')
|
||||
|
||||
|
||||
class fullsync_mixin(object):
|
||||
def __init__(self):
|
||||
self._canceled = False
|
||||
|
||||
def abort(self):
|
||||
"""Hit method to terminate the thread"""
|
||||
self._canceled = True
|
||||
# Let's NOT suspend sync threads but immediately terminate them
|
||||
suspend = abort
|
||||
|
||||
@property
|
||||
def suspend_reached(self):
|
||||
"""Since we're not suspending, we'll never set it to True"""
|
||||
return False
|
||||
|
||||
@suspend_reached.setter
|
||||
def suspend_reached(self):
|
||||
pass
|
||||
|
||||
def resume(self):
|
||||
"""Obsolete since we're not suspending"""
|
||||
pass
|
||||
|
||||
def isCanceled(self):
|
||||
"""Check whether we should exit this thread"""
|
||||
return self._canceled
|
||||
|
||||
|
||||
def update_kodi_library(video=True, music=True):
|
||||
"""
|
||||
Updates the Kodi library and thus refreshes the Kodi views and widgets
|
||||
|
|
|
@ -27,48 +27,51 @@ class FanartThread(backgroundthread.KillableThread):
|
|||
self.refresh = refresh
|
||||
super(FanartThread, self).__init__()
|
||||
|
||||
def isSuspended(self):
|
||||
def should_suspend(self):
|
||||
return self._suspended or app.APP.is_playing_video
|
||||
|
||||
def run(self):
|
||||
LOG.info('Starting FanartThread')
|
||||
app.APP.register_fanart_thread(self)
|
||||
try:
|
||||
self._run_internal()
|
||||
self._run()
|
||||
except Exception:
|
||||
utils.ERROR(notify=True)
|
||||
finally:
|
||||
app.APP.deregister_fanart_thread(self)
|
||||
|
||||
def _run_internal(self):
|
||||
def _loop(self):
|
||||
for typus in SUPPORTED_TYPES:
|
||||
offset = 0
|
||||
while True:
|
||||
with PlexDB() as plexdb:
|
||||
# Keep DB connection open only for a short period of time!
|
||||
if self.refresh:
|
||||
batch = list(plexdb.every_plex_id(typus,
|
||||
offset,
|
||||
BATCH_SIZE))
|
||||
else:
|
||||
batch = list(plexdb.missing_fanart(typus,
|
||||
offset,
|
||||
BATCH_SIZE))
|
||||
for plex_id in batch:
|
||||
# Do the actual, time-consuming processing
|
||||
if self.should_suspend() or self.should_cancel():
|
||||
return False
|
||||
process_fanart(plex_id, typus, self.refresh)
|
||||
if len(batch) < BATCH_SIZE:
|
||||
break
|
||||
offset += BATCH_SIZE
|
||||
return True
|
||||
|
||||
def _run(self):
|
||||
finished = False
|
||||
try:
|
||||
for typus in SUPPORTED_TYPES:
|
||||
offset = 0
|
||||
while True:
|
||||
with PlexDB() as plexdb:
|
||||
# Keep DB connection open only for a short period of time!
|
||||
if self.refresh:
|
||||
batch = list(plexdb.every_plex_id(typus,
|
||||
offset,
|
||||
BATCH_SIZE))
|
||||
else:
|
||||
batch = list(plexdb.missing_fanart(typus,
|
||||
offset,
|
||||
BATCH_SIZE))
|
||||
for plex_id in batch:
|
||||
# Do the actual, time-consuming processing
|
||||
if self.wait_while_suspended():
|
||||
return
|
||||
process_fanart(plex_id, typus, self.refresh)
|
||||
if len(batch) < BATCH_SIZE:
|
||||
break
|
||||
offset += BATCH_SIZE
|
||||
else:
|
||||
finished = True
|
||||
finally:
|
||||
LOG.info('FanartThread finished: %s', finished)
|
||||
self.callback(finished)
|
||||
while not finished:
|
||||
finished = self._loop()
|
||||
if self.wait_while_suspended():
|
||||
break
|
||||
LOG.info('FanartThread finished: %s', finished)
|
||||
self.callback(finished)
|
||||
|
||||
|
||||
class FanartTask(backgroundthread.Task):
|
||||
|
|
|
@ -39,7 +39,7 @@ class InitNewSection(object):
|
|||
self.plex_type = plex_type
|
||||
|
||||
|
||||
class FullSync(common.fullsync_mixin):
|
||||
class FullSync(backgroundthread.KillableThread):
|
||||
def __init__(self, repair, callback, show_dialog):
|
||||
"""
|
||||
repair=True: force sync EVERY item
|
||||
|
@ -75,6 +75,12 @@ class FullSync(common.fullsync_mixin):
|
|||
worker_count=self.worker_count)
|
||||
super(FullSync, self).__init__()
|
||||
|
||||
def suspend(self, block=False, timeout=None):
|
||||
"""
|
||||
Let's NOT suspend sync threads but immediately terminate them
|
||||
"""
|
||||
self.cancel()
|
||||
|
||||
def update_progressbar(self):
|
||||
if self.dialog:
|
||||
try:
|
||||
|
@ -112,7 +118,7 @@ class FullSync(common.fullsync_mixin):
|
|||
if not self.section:
|
||||
_, self.section = self.queue.get()
|
||||
self.queue.task_done()
|
||||
while not self.isCanceled() and self.item_count > 0:
|
||||
while not self.should_cancel() and self.item_count > 0:
|
||||
section = self.section
|
||||
if not section:
|
||||
break
|
||||
|
@ -124,12 +130,12 @@ class FullSync(common.fullsync_mixin):
|
|||
self.section_type_text = utils.lang(
|
||||
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
|
||||
with section.context(self.current_sync) as context:
|
||||
while not self.isCanceled() and self.item_count > 0:
|
||||
while not self.should_cancel() and self.item_count > 0:
|
||||
try:
|
||||
_, item = self.queue.get(block=False)
|
||||
except backgroundthread.Queue.Empty:
|
||||
if self.threader.threader.working():
|
||||
app.APP.monitor.waitForAbort(0.02)
|
||||
self.sleep(0.02)
|
||||
continue
|
||||
else:
|
||||
# Try again, in case a thread just finished
|
||||
|
@ -187,7 +193,7 @@ class FullSync(common.fullsync_mixin):
|
|||
# Check Plex DB to see what we need to add/update
|
||||
with PlexDB() as self.plexdb:
|
||||
for last, xml_item in loop:
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
return False
|
||||
self.process_item(xml_item)
|
||||
if self.item_count == BATCH_SIZE:
|
||||
|
@ -227,7 +233,7 @@ class FullSync(common.fullsync_mixin):
|
|||
while True:
|
||||
with section.context(self.current_sync) as itemtype:
|
||||
for i, (last, xml_item) in enumerate(loop):
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
return False
|
||||
if not itemtype.update_userdata(xml_item, section.plex_type):
|
||||
# Somehow did not sync this item yet
|
||||
|
@ -256,7 +262,7 @@ class FullSync(common.fullsync_mixin):
|
|||
for kind in kinds:
|
||||
for section in (x for x in app.SYNC.sections
|
||||
if x.section_type == kind[1]):
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
LOG.debug('Need to exit now')
|
||||
return
|
||||
if not section.sync_to_kodi:
|
||||
|
@ -332,7 +338,7 @@ class FullSync(common.fullsync_mixin):
|
|||
self.get_children = section.get_children
|
||||
self.queue = section.Queue()
|
||||
# Now do the heavy lifting
|
||||
if self.isCanceled() or not self.addupdate_section(section):
|
||||
if self.should_cancel() or not self.addupdate_section(section):
|
||||
return False
|
||||
if self.section_success:
|
||||
# Need to check because a thread might have missed to get
|
||||
|
@ -391,7 +397,7 @@ class FullSync(common.fullsync_mixin):
|
|||
self.context = section.context
|
||||
self.get_children = section.get_children
|
||||
# Now do the heavy lifting
|
||||
if self.isCanceled() or not self.playstate_per_section(section):
|
||||
if self.should_cancel() or not self.playstate_per_section(section):
|
||||
return False
|
||||
|
||||
# Delete movies that are not on Plex anymore
|
||||
|
@ -416,7 +422,7 @@ class FullSync(common.fullsync_mixin):
|
|||
self.current_sync,
|
||||
BATCH_SIZE))
|
||||
for plex_id in plex_ids:
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
return False
|
||||
ctx.remove(plex_id, plex_type)
|
||||
if len(plex_ids) < BATCH_SIZE:
|
||||
|
@ -436,7 +442,7 @@ class FullSync(common.fullsync_mixin):
|
|||
def _run(self):
|
||||
self.current_sync = timing.plex_now()
|
||||
# Get latest Plex libraries and build playlist and video node files
|
||||
if self.isCanceled() or not sections.sync_from_pms(self):
|
||||
if self.should_cancel() or not sections.sync_from_pms(self):
|
||||
return
|
||||
self.successful = True
|
||||
try:
|
||||
|
@ -447,7 +453,7 @@ class FullSync(common.fullsync_mixin):
|
|||
# Actual syncing - do only new items first
|
||||
LOG.info('Running full_library_sync with repair=%s',
|
||||
self.repair)
|
||||
if self.isCanceled() or not self.full_library_sync():
|
||||
if self.should_cancel() or not self.full_library_sync():
|
||||
self.successful = False
|
||||
return
|
||||
finally:
|
||||
|
@ -457,7 +463,7 @@ class FullSync(common.fullsync_mixin):
|
|||
if self.threader:
|
||||
self.threader.shutdown()
|
||||
self.threader = None
|
||||
if not self.successful and not self.isCanceled():
|
||||
if not self.successful and not self.should_cancel():
|
||||
# "ERROR in library sync"
|
||||
utils.dialog('notification',
|
||||
heading='{plex}',
|
||||
|
@ -468,4 +474,5 @@ class FullSync(common.fullsync_mixin):
|
|||
|
||||
|
||||
def start(show_dialog, repair=False, callback=None):
|
||||
# Call run() and NOT start in order to not spawn another thread
|
||||
FullSync(repair, callback, show_dialog).run()
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
|
||||
from . import common
|
||||
from ..plex_api import API
|
||||
from .. import plex_functions as PF, backgroundthread, utils, variables as v
|
||||
|
||||
|
@ -27,7 +26,7 @@ def reset_collections():
|
|||
COLLECTION_XMLS = {}
|
||||
|
||||
|
||||
class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
||||
class GetMetadataTask(backgroundthread.Task):
|
||||
"""
|
||||
Threaded download of Plex XML metadata for a certain library item.
|
||||
Fills the queue with the downloaded etree XML objects
|
||||
|
@ -45,6 +44,12 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
|||
self.count = count
|
||||
super(GetMetadataTask, self).__init__()
|
||||
|
||||
def suspend(self, block=False, timeout=None):
|
||||
"""
|
||||
Let's NOT suspend sync threads but immediately terminate them
|
||||
"""
|
||||
self.cancel()
|
||||
|
||||
def _collections(self, item):
|
||||
global COLLECTION_MATCH, COLLECTION_XMLS
|
||||
api = API(item['xml'][0])
|
||||
|
@ -59,7 +64,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
|||
utils.cast(int, x.get('ratingKey'))) for x in COLLECTION_MATCH]
|
||||
item['children'] = {}
|
||||
for plex_set_id, set_name in api.collections():
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
return
|
||||
if plex_set_id not in COLLECTION_XMLS:
|
||||
# Get Plex metadata for collections - a pain
|
||||
|
@ -84,7 +89,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
|||
"""
|
||||
Do the work
|
||||
"""
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
return
|
||||
# Download Metadata
|
||||
item = {
|
||||
|
@ -101,7 +106,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
|||
'Cancelling sync for now')
|
||||
utils.window('plex_scancrashed', value='401')
|
||||
return
|
||||
if not self.isCanceled() and self.plex_type == v.PLEX_TYPE_MOVIE:
|
||||
if not self.should_cancel() and self.plex_type == v.PLEX_TYPE_MOVIE:
|
||||
# Check for collections/sets
|
||||
collections = False
|
||||
for child in item['xml'][0]:
|
||||
|
@ -112,7 +117,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
|||
global LOCK
|
||||
with LOCK:
|
||||
self._collections(item)
|
||||
if not self.isCanceled() and self.get_children:
|
||||
if not self.should_cancel() and self.get_children:
|
||||
children_xml = PF.GetAllPlexChildren(self.plex_id)
|
||||
try:
|
||||
children_xml[0].attrib
|
||||
|
@ -121,5 +126,5 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task):
|
|||
self.plex_id)
|
||||
else:
|
||||
item['children'] = children_xml
|
||||
if not self.isCanceled():
|
||||
if not self.should_cancel():
|
||||
self.queue.put((self.count, item))
|
||||
|
|
|
@ -16,7 +16,7 @@ LOG = getLogger('PLEX.sync.sections')
|
|||
|
||||
BATCH_SIZE = 500
|
||||
# Need a way to interrupt our synching process
|
||||
IS_CANCELED = None
|
||||
SHOULD_CANCEL = None
|
||||
|
||||
LIBRARY_PATH = path_ops.translate_path('special://profile/library/video/')
|
||||
# The video library might not yet exist for this user - create it
|
||||
|
@ -490,7 +490,7 @@ def _delete_kodi_db_items(section):
|
|||
with kodi_context(texture_db=True) as kodidb:
|
||||
typus = context(None, plexdb=plexdb, kodidb=kodidb)
|
||||
for plex_id in plex_ids:
|
||||
if IS_CANCELED():
|
||||
if SHOULD_CANCEL():
|
||||
return False
|
||||
typus.remove(plex_id)
|
||||
if len(plex_ids) < BATCH_SIZE:
|
||||
|
@ -582,13 +582,13 @@ def sync_from_pms(parent_self, pick_libraries=False):
|
|||
pick_libraries=True will prompt the user the select the libraries he
|
||||
wants to sync
|
||||
"""
|
||||
global IS_CANCELED
|
||||
global SHOULD_CANCEL
|
||||
LOG.info('Starting synching sections from the PMS')
|
||||
IS_CANCELED = parent_self.isCanceled
|
||||
SHOULD_CANCEL = parent_self.should_cancel
|
||||
try:
|
||||
return _sync_from_pms(pick_libraries)
|
||||
finally:
|
||||
IS_CANCELED = None
|
||||
SHOULD_CANCEL = None
|
||||
LOG.info('Done synching sections from the PMS: %s', app.SYNC.sections)
|
||||
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ SUPPORTED_FILETYPES = (
|
|||
###############################################################################
|
||||
|
||||
|
||||
def isCanceled():
|
||||
def should_cancel():
|
||||
return app.APP.stop_pkc or app.SYNC.stop_sync
|
||||
|
||||
|
||||
|
@ -179,7 +179,7 @@ def _full_sync():
|
|||
# before. If yes, make sure that hashes are identical. If not, sync it.
|
||||
old_plex_ids = db.plex_playlist_ids()
|
||||
for xml_playlist in xml:
|
||||
if isCanceled():
|
||||
if should_cancel():
|
||||
return False
|
||||
api = API(xml_playlist)
|
||||
try:
|
||||
|
@ -211,7 +211,7 @@ def _full_sync():
|
|||
LOG.info('Could not recreate playlist %s', api.plex_id)
|
||||
# Get rid of old Plex playlists that were deleted on the Plex side
|
||||
for plex_id in old_plex_ids:
|
||||
if isCanceled():
|
||||
if should_cancel():
|
||||
return False
|
||||
playlist = db.get_playlist(plex_id=plex_id)
|
||||
LOG.debug('Removing outdated Plex playlist from Kodi: %s', playlist)
|
||||
|
@ -225,7 +225,7 @@ def _full_sync():
|
|||
old_kodi_paths = db.kodi_playlist_paths()
|
||||
for root, _, files in path_ops.walk(v.PLAYLIST_PATH):
|
||||
for f in files:
|
||||
if isCanceled():
|
||||
if should_cancel():
|
||||
return False
|
||||
path = path_ops.path.join(root, f)
|
||||
try:
|
||||
|
@ -256,7 +256,7 @@ def _full_sync():
|
|||
except PlaylistError:
|
||||
LOG.info('Skipping Kodi playlist %s', path)
|
||||
for kodi_path in old_kodi_paths:
|
||||
if isCanceled():
|
||||
if should_cancel():
|
||||
return False
|
||||
playlist = db.get_playlist(path=kodi_path)
|
||||
if not playlist:
|
||||
|
|
|
@ -120,7 +120,7 @@ class PlayqueueMonitor(backgroundthread.KillableThread):
|
|||
# Ignore new media added by other addons
|
||||
continue
|
||||
for j, old_item in enumerate(old):
|
||||
if self.isCanceled():
|
||||
if self.should_suspend() or self.should_cancel():
|
||||
# Chances are that we got an empty Kodi playlist due to
|
||||
# Kodi exit
|
||||
return
|
||||
|
@ -189,7 +189,7 @@ class PlayqueueMonitor(backgroundthread.KillableThread):
|
|||
for j in range(i, len(index)):
|
||||
index[j] += 1
|
||||
for i in reversed(index):
|
||||
if self.isCanceled():
|
||||
if self.should_suspend() or self.should_cancel():
|
||||
# Chances are that we got an empty Kodi playlist due to
|
||||
# Kodi exit
|
||||
return
|
||||
|
@ -212,9 +212,10 @@ class PlayqueueMonitor(backgroundthread.KillableThread):
|
|||
LOG.info("----===## PlayqueueMonitor stopped ##===----")
|
||||
|
||||
def _run(self):
|
||||
while not self.isCanceled():
|
||||
if self.wait_while_suspended():
|
||||
return
|
||||
while not self.should_cancel():
|
||||
if self.should_suspend():
|
||||
if self.wait_while_suspended():
|
||||
return
|
||||
with app.APP.lock_playqueues:
|
||||
for playqueue in PLAYQUEUES:
|
||||
kodi_pl = js.playlist_get_items(playqueue.playlistid)
|
||||
|
@ -228,4 +229,4 @@ class PlayqueueMonitor(backgroundthread.KillableThread):
|
|||
# compare old and new playqueue
|
||||
self._compare_playqueues(playqueue, kodi_pl)
|
||||
playqueue.old_kodi_pl = list(kodi_pl)
|
||||
app.APP.monitor.waitForAbort(0.2)
|
||||
self.sleep(0.2)
|
||||
|
|
|
@ -312,12 +312,13 @@ class PlexCompanion(backgroundthread.KillableThread):
|
|||
if httpd:
|
||||
thread = Thread(target=httpd.handle_request)
|
||||
|
||||
while not self.isCanceled():
|
||||
while not self.should_cancel():
|
||||
# If we are not authorized, sleep
|
||||
# Otherwise, we trigger a download which leads to a
|
||||
# re-authorizations
|
||||
if self.wait_while_suspended():
|
||||
break
|
||||
if self.should_suspend():
|
||||
if self.wait_while_suspended():
|
||||
break
|
||||
try:
|
||||
message_count += 1
|
||||
if httpd:
|
||||
|
@ -356,6 +357,6 @@ class PlexCompanion(backgroundthread.KillableThread):
|
|||
app.APP.companion_queue.task_done()
|
||||
# Don't sleep
|
||||
continue
|
||||
app.APP.monitor.waitForAbort(0.05)
|
||||
self.sleep(0.05)
|
||||
subscription_manager.signal_stop()
|
||||
client.stop_all()
|
||||
|
|
|
@ -101,7 +101,7 @@ class Service(object):
|
|||
self._init_done = True
|
||||
|
||||
@staticmethod
|
||||
def isCanceled():
|
||||
def should_cancel():
|
||||
return xbmc.abortRequested or app.APP.stop_pkc
|
||||
|
||||
def on_connection_check(self, result):
|
||||
|
@ -437,7 +437,7 @@ class Service(object):
|
|||
self.playqueue = playqueue.PlayqueueMonitor()
|
||||
|
||||
# Main PKC program loop
|
||||
while not self.isCanceled():
|
||||
while not self.should_cancel():
|
||||
|
||||
# Check for PKC commands from other Python instances
|
||||
plex_command = utils.window('plexkodiconnect.command')
|
||||
|
|
|
@ -38,7 +38,9 @@ class Sync(backgroundthread.KillableThread):
|
|||
self.start_library_sync(show_dialog=True,
|
||||
repair=app.SYNC.run_lib_scan == 'repair',
|
||||
block=True)
|
||||
if not self.sync_successful and not self.isSuspended() and not self.isCanceled():
|
||||
if (not self.sync_successful and
|
||||
not self.should_suspend() and
|
||||
not self.should_cancel()):
|
||||
# ERROR in library sync
|
||||
LOG.warn('Triggered full/repair sync has not been successful')
|
||||
elif app.SYNC.run_lib_scan == 'fanart':
|
||||
|
@ -112,7 +114,7 @@ class Sync(backgroundthread.KillableThread):
|
|||
LOG.info('Not synching Plex artwork - not caching')
|
||||
return
|
||||
if self.image_cache_thread and self.image_cache_thread.is_alive():
|
||||
self.image_cache_thread.abort()
|
||||
self.image_cache_thread.cancel()
|
||||
self.image_cache_thread.join()
|
||||
self.image_cache_thread = artwork.ImageCachingThread()
|
||||
self.image_cache_thread.start()
|
||||
|
@ -163,10 +165,11 @@ class Sync(backgroundthread.KillableThread):
|
|||
|
||||
utils.init_dbs()
|
||||
|
||||
while not self.isCanceled():
|
||||
while not self.should_cancel():
|
||||
# In the event the server goes offline
|
||||
if self.wait_while_suspended():
|
||||
return
|
||||
if self.should_suspend():
|
||||
if self.wait_while_suspended():
|
||||
return
|
||||
if not install_sync_done:
|
||||
# Very FIRST sync ever upon installation or reset of Kodi DB
|
||||
LOG.info('Initial start-up full sync starting')
|
||||
|
@ -188,7 +191,7 @@ class Sync(backgroundthread.KillableThread):
|
|||
self.start_image_cache_thread()
|
||||
else:
|
||||
LOG.error('Initial start-up full sync unsuccessful')
|
||||
app.APP.monitor.waitForAbort(1)
|
||||
self.sleep(1)
|
||||
xbmc.executebuiltin('InhibitIdleShutdown(false)')
|
||||
|
||||
elif not initial_sync_done:
|
||||
|
@ -205,7 +208,7 @@ class Sync(backgroundthread.KillableThread):
|
|||
self.start_image_cache_thread()
|
||||
else:
|
||||
LOG.info('Startup sync has not yet been successful')
|
||||
app.APP.monitor.waitForAbort(1)
|
||||
self.sleep(1)
|
||||
|
||||
# Currently no db scan, so we could start a new scan
|
||||
else:
|
||||
|
@ -240,9 +243,9 @@ class Sync(backgroundthread.KillableThread):
|
|||
library_sync.store_websocket_message(message)
|
||||
queue.task_done()
|
||||
# Sleep just a bit
|
||||
app.APP.monitor.waitForAbort(0.01)
|
||||
self.sleep(0.01)
|
||||
continue
|
||||
app.APP.monitor.waitForAbort(0.1)
|
||||
self.sleep(0.1)
|
||||
# Shut down playlist monitoring
|
||||
if playlist_monitor:
|
||||
playlist_monitor.stop()
|
||||
|
|
|
@ -19,7 +19,7 @@ class WebSocket(backgroundthread.KillableThread):
|
|||
def __init__(self):
|
||||
self.ws = None
|
||||
self.redirect_uri = None
|
||||
self.sleeptime = 0
|
||||
self.sleeptime = 0.0
|
||||
super(WebSocket, self).__init__()
|
||||
|
||||
def process(self, opcode, message):
|
||||
|
@ -46,15 +46,15 @@ class WebSocket(backgroundthread.KillableThread):
|
|||
def getUri(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def __sleep(self):
|
||||
def _sleep_cycle(self):
|
||||
"""
|
||||
Sleeps for 2^self.sleeptime where sleeping period will be doubled with
|
||||
each unsuccessful connection attempt.
|
||||
Will sleep at most 64 seconds
|
||||
"""
|
||||
app.APP.monitor.waitForAbort(2**self.sleeptime)
|
||||
self.sleep(2 ** self.sleeptime)
|
||||
if self.sleeptime < 6:
|
||||
self.sleeptime += 1
|
||||
self.sleeptime += 1.0
|
||||
|
||||
def run(self):
|
||||
LOG.info("----===## Starting %s ##===----", self.__class__.__name__)
|
||||
|
@ -69,9 +69,9 @@ class WebSocket(backgroundthread.KillableThread):
|
|||
LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__)
|
||||
|
||||
def _run(self):
|
||||
while not self.isCanceled():
|
||||
while not self.should_cancel():
|
||||
# In the event the server goes offline
|
||||
if self.isSuspended():
|
||||
if self.should_suspend():
|
||||
# Set in service.py
|
||||
if self.ws is not None:
|
||||
self.ws.close()
|
||||
|
@ -99,11 +99,11 @@ class WebSocket(backgroundthread.KillableThread):
|
|||
# Server is probably offline
|
||||
LOG.debug("%s: IOError connecting", self.__class__.__name__)
|
||||
self.ws = None
|
||||
self.__sleep()
|
||||
self._sleep_cycle()
|
||||
except websocket.WebSocketTimeoutException:
|
||||
LOG.debug("%s: WebSocketTimeoutException", self.__class__.__name__)
|
||||
self.ws = None
|
||||
self.__sleep()
|
||||
self._sleep_cycle()
|
||||
except websocket.WebsocketRedirect as e:
|
||||
LOG.debug('301 redirect detected: %s', e)
|
||||
self.redirect_uri = e.headers.get('location',
|
||||
|
@ -111,11 +111,11 @@ class WebSocket(backgroundthread.KillableThread):
|
|||
if self.redirect_uri:
|
||||
self.redirect_uri = self.redirect_uri.decode('utf-8')
|
||||
self.ws = None
|
||||
self.__sleep()
|
||||
self._sleep_cycle()
|
||||
except websocket.WebSocketException as e:
|
||||
LOG.debug('%s: WebSocketException: %s', self.__class__.__name__, e)
|
||||
self.ws = None
|
||||
self.__sleep()
|
||||
self._sleep_cycle()
|
||||
except Exception as e:
|
||||
LOG.error('%s: Unknown exception encountered when '
|
||||
'connecting: %s', self.__class__.__name__, e)
|
||||
|
@ -123,9 +123,9 @@ class WebSocket(backgroundthread.KillableThread):
|
|||
LOG.error("%s: Traceback:\n%s",
|
||||
self.__class__.__name__, traceback.format_exc())
|
||||
self.ws = None
|
||||
self.__sleep()
|
||||
self._sleep_cycle()
|
||||
else:
|
||||
self.sleeptime = 0
|
||||
self.sleeptime = 0.0
|
||||
except Exception as e:
|
||||
LOG.error("%s: Unknown exception encountered: %s",
|
||||
self.__class__.__name__, e)
|
||||
|
@ -141,7 +141,7 @@ class PMS_Websocket(WebSocket):
|
|||
"""
|
||||
Websocket connection with the PMS for Plex Companion
|
||||
"""
|
||||
def isSuspended(self):
|
||||
def should_suspend(self):
|
||||
"""
|
||||
Returns True if the thread is suspended
|
||||
"""
|
||||
|
@ -206,7 +206,7 @@ class Alexa_Websocket(WebSocket):
|
|||
"""
|
||||
Websocket connection to talk to Amazon Alexa.
|
||||
"""
|
||||
def isSuspended(self):
|
||||
def should_suspend(self):
|
||||
"""
|
||||
Overwrite method since we need to check for plex token
|
||||
"""
|
||||
|
|
|
@ -24,7 +24,7 @@ class UserThumbTask(backgroundthread.Task):
|
|||
|
||||
def run(self):
|
||||
for user in self.users:
|
||||
if self.isCanceled():
|
||||
if self.should_cancel():
|
||||
return
|
||||
thumb, back = user.thumb, ''
|
||||
self.callback(user, thumb, back)
|
||||
|
|
Loading…
Reference in a new issue