From c85e1e2bd07799d1262afc897213ba6c9d967320 Mon Sep 17 00:00:00 2001 From: croneter Date: Thu, 28 Nov 2019 17:49:48 +0100 Subject: [PATCH] Optimize threads by using events instead of a polling mechanism. Fixes PKC become unresponsive, e.g. when switching users --- resources/lib/app/application.py | 21 +-- resources/lib/artwork.py | 18 ++- resources/lib/backgroundthread.py | 158 ++++++++------------- resources/lib/library_sync/common.py | 28 ---- resources/lib/library_sync/fanart.py | 63 ++++---- resources/lib/library_sync/full_sync.py | 33 +++-- resources/lib/library_sync/get_metadata.py | 19 ++- resources/lib/library_sync/sections.py | 10 +- resources/lib/playlists/__init__.py | 10 +- resources/lib/playqueue.py | 13 +- resources/lib/plex_companion.py | 9 +- resources/lib/service_entry.py | 4 +- resources/lib/sync.py | 21 +-- resources/lib/websocket_client.py | 28 ++-- resources/lib/windows/userselect.py | 2 +- 15 files changed, 194 insertions(+), 243 deletions(-) diff --git a/resources/lib/app/application.py b/resources/lib/app/application.py index 9d3fc0d0..d2e919d3 100644 --- a/resources/lib/app/application.py +++ b/resources/lib/app/application.py @@ -124,19 +124,16 @@ class App(object): if block: while True: for thread in self.threads: - if not thread.suspend_reached: + if not thread.is_suspended(): LOG.debug('Waiting for thread to suspend: %s', thread) # Send suspend signal again in case self.threads # changed - thread.suspend() - if self.monitor.waitForAbort(0.1): - return True - break + thread.suspend(block=True) else: break return xbmc.abortRequested - def resume_threads(self, block=True): + def resume_threads(self): """ Resume all thread activity with or without blocking. Returns True only if PKC shutdown requested @@ -144,16 +141,6 @@ class App(object): LOG.debug('Resuming threads: %s', self.threads) for thread in self.threads: thread.resume() - if block: - while True: - for thread in self.threads: - if thread.suspend_reached: - LOG.debug('Waiting for thread to resume: %s', thread) - if self.monitor.waitForAbort(0.1): - return True - break - else: - break return xbmc.abortRequested def stop_threads(self, block=True): @@ -163,7 +150,7 @@ class App(object): """ LOG.debug('Killing threads: %s', self.threads) for thread in self.threads: - thread.abort() + thread.cancel() if block: while self.threads: LOG.debug('Waiting for threads to exit: %s', self.threads) diff --git a/resources/lib/artwork.py b/resources/lib/artwork.py index 47fdfd45..113d8a32 100644 --- a/resources/lib/artwork.py +++ b/resources/lib/artwork.py @@ -33,8 +33,8 @@ class ImageCachingThread(backgroundthread.KillableThread): if not utils.settings('imageSyncDuringPlayback') == 'true': self.suspend_points.append((app.APP, 'is_playing_video')) - def isSuspended(self): - return any(getattr(obj, txt) for obj, txt in self.suspend_points) + def should_suspend(self): + return any(getattr(obj, attrib) for obj, attrib in self.suspend_points) @staticmethod def _url_generator(kind, kodi_type): @@ -73,18 +73,26 @@ class ImageCachingThread(backgroundthread.KillableThread): app.APP.deregister_caching_thread(self) LOG.info("---===### Stopped ImageCachingThread ###===---") - def _run(self): + def _loop(self): kinds = [KodiVideoDB] if app.SYNC.enable_music: kinds.append(KodiMusicDB) for kind in kinds: for kodi_type in ('poster', 'fanart'): for url in self._url_generator(kind, kodi_type): - if self.wait_while_suspended(): - return + if self.should_suspend() or self.should_cancel(): + return False cache_url(url) # Toggles Image caching completed to Yes utils.settings('plex_status_image_caching', value=utils.lang(107)) + return True + + def _run(self): + while True: + if self._loop(): + break + if self.wait_while_suspended(): + break def cache_url(url): diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 7823fffd..45f7602f 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -13,131 +13,95 @@ LOG = getLogger('PLEX.threads') class KillableThread(threading.Thread): - '''A thread class that supports raising exception in the thread from - another thread. - ''' - # def _get_my_tid(self): - # """determines this (self's) thread id - - # CAREFUL : this function is executed in the context of the caller - # thread, to get the identity of the thread represented by this - # instance. - # """ - # if not self.isAlive(): - # raise threading.ThreadError("the thread is not active") - - # return self.ident - - # def _raiseExc(self, exctype): - # """Raises the given exception type in the context of this thread. - - # If the thread is busy in a system call (time.sleep(), - # socket.accept(), ...), the exception is simply ignored. - - # If you are sure that your exception should terminate the thread, - # one way to ensure that it works is: - - # t = ThreadWithExc( ... ) - # ... - # t.raiseExc( SomeException ) - # while t.isAlive(): - # time.sleep( 0.1 ) - # t.raiseExc( SomeException ) - - # If the exception is to be caught by the thread, you need a way to - # check that your thread has caught it. - - # CAREFUL : this function is executed in the context of the - # caller thread, to raise an excpetion in the context of the - # thread represented by this instance. - # """ - # _async_raise(self._get_my_tid(), exctype) - - def kill(self, force_and_wait=False): - pass - # try: - # self._raiseExc(KillThreadException) - - # if force_and_wait: - # time.sleep(0.1) - # while self.isAlive(): - # self._raiseExc(KillThreadException) - # time.sleep(0.1) - # except threading.ThreadError: - # pass - - # def onKilled(self): - # pass - - # def run(self): - # try: - # self._Thread__target(*self._Thread__args, **self._Thread__kwargs) - # except KillThreadException: - # self.onKilled() - def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self._canceled = False - # Set to True to set the thread to suspended self._suspended = False - # Thread will return True only if suspended state is reached - self.suspend_reached = False + self._is_not_suspended = threading.Event() + self._is_not_suspended.set() + self._suspension_reached = threading.Event() + self._is_not_asleep = threading.Event() + self._is_not_asleep.set() + self.suspension_timeout = None super(KillableThread, self).__init__(group, target, name, args, kwargs) - def isCanceled(self): + def should_cancel(self): """ - Returns True if the thread is stopped + Returns True if the thread should be stopped immediately """ - if self._canceled or xbmc.abortRequested: - return True - return False + return self._canceled or app.APP.stop_pkc - def abort(self): + def cancel(self): """ - Call to stop this thread + Call from another thread to stop this current thread """ self._canceled = True + # Make sure thread is running in order to exit quickly + self._is_not_suspended.set() + self._is_not_asleep.set() - def suspend(self, block=False): + def should_suspend(self): """ - Call to suspend this thread + Returns True if the current thread should be suspended immediately """ + return self._suspended + + def suspend(self, block=False, timeout=None): + """ + Call from another thread to suspend the current thread. Provide a + timeout [float] in seconds optionally. block=True will block the caller + until the thread-to-be-suspended is indeed suspended + Will wake a thread that is asleep! + """ + self.suspension_timeout = timeout self._suspended = True + self._is_not_suspended.clear() + # Make sure thread wakes up in order to suspend + self._is_not_asleep.set() if block: - while not self.suspend_reached: - LOG.debug('Waiting for thread to suspend: %s', self) - if app.APP.monitor.waitForAbort(0.1): - return + self._suspension_reached.wait() def resume(self): """ - Call to revive a suspended thread back to life + Call from another thread to revive a suspended or asleep current thread + back to life """ self._suspended = False + self._is_not_suspended.set() + self._is_not_asleep.set() def wait_while_suspended(self): """ Blocks until thread is not suspended anymore or the thread should - exit. - Returns True only if the thread should exit (=isCanceled()) + exit or for a period of self.suspension_timeout (set by the caller of + suspend()) + Returns the value of should_cancel() """ - while self.isSuspended(): - try: - self.suspend_reached = True - # Set in service.py - if self.isCanceled(): - # Abort was requested while waiting. We should exit - return True - if app.APP.monitor.waitForAbort(0.1): - return True - finally: - self.suspend_reached = False - return self.isCanceled() + self._suspension_reached.set() + self._is_not_suspended.wait(self.suspension_timeout) + self._suspension_reached.clear() + return self.should_cancel() - def isSuspended(self): + def is_suspended(self): """ - Returns True if the thread is suspended + Check from another thread whether the current thread is suspended """ - return self._suspended + return self._suspension_reached.is_set() + + def sleep(self, timeout): + """ + Only call from the current thread in order to sleep for a period of + timeout [float, seconds]. Will unblock immediately if thread should + cancel (should_cancel()) or the thread should_suspend + """ + self._is_not_asleep.clear() + self._is_not_asleep.wait(timeout) + self._is_not_asleep.set() + + def is_asleep(self): + """ + Check from another thread whether the current thread is asleep + """ + return not self._is_not_asleep.is_set() class OrderedQueue(Queue.PriorityQueue, object): @@ -239,7 +203,7 @@ class Task(object): def cancel(self): self._canceled = True - def isCanceled(self): + def should_cancel(self): return self._canceled or xbmc.abortRequested def isValid(self): diff --git a/resources/lib/library_sync/common.py b/resources/lib/library_sync/common.py index 5497dbf8..c2481956 100644 --- a/resources/lib/library_sync/common.py +++ b/resources/lib/library_sync/common.py @@ -9,34 +9,6 @@ PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and utils.settings('enablePlaylistSync') == 'true') -class fullsync_mixin(object): - def __init__(self): - self._canceled = False - - def abort(self): - """Hit method to terminate the thread""" - self._canceled = True - # Let's NOT suspend sync threads but immediately terminate them - suspend = abort - - @property - def suspend_reached(self): - """Since we're not suspending, we'll never set it to True""" - return False - - @suspend_reached.setter - def suspend_reached(self): - pass - - def resume(self): - """Obsolete since we're not suspending""" - pass - - def isCanceled(self): - """Check whether we should exit this thread""" - return self._canceled - - def update_kodi_library(video=True, music=True): """ Updates the Kodi library and thus refreshes the Kodi views and widgets diff --git a/resources/lib/library_sync/fanart.py b/resources/lib/library_sync/fanart.py index 7a4b59ff..9ad26a7a 100644 --- a/resources/lib/library_sync/fanart.py +++ b/resources/lib/library_sync/fanart.py @@ -27,48 +27,51 @@ class FanartThread(backgroundthread.KillableThread): self.refresh = refresh super(FanartThread, self).__init__() - def isSuspended(self): + def should_suspend(self): return self._suspended or app.APP.is_playing_video def run(self): LOG.info('Starting FanartThread') app.APP.register_fanart_thread(self) try: - self._run_internal() + self._run() except Exception: utils.ERROR(notify=True) finally: app.APP.deregister_fanart_thread(self) - def _run_internal(self): + def _loop(self): + for typus in SUPPORTED_TYPES: + offset = 0 + while True: + with PlexDB() as plexdb: + # Keep DB connection open only for a short period of time! + if self.refresh: + batch = list(plexdb.every_plex_id(typus, + offset, + BATCH_SIZE)) + else: + batch = list(plexdb.missing_fanart(typus, + offset, + BATCH_SIZE)) + for plex_id in batch: + # Do the actual, time-consuming processing + if self.should_suspend() or self.should_cancel(): + return False + process_fanart(plex_id, typus, self.refresh) + if len(batch) < BATCH_SIZE: + break + offset += BATCH_SIZE + return True + + def _run(self): finished = False - try: - for typus in SUPPORTED_TYPES: - offset = 0 - while True: - with PlexDB() as plexdb: - # Keep DB connection open only for a short period of time! - if self.refresh: - batch = list(plexdb.every_plex_id(typus, - offset, - BATCH_SIZE)) - else: - batch = list(plexdb.missing_fanart(typus, - offset, - BATCH_SIZE)) - for plex_id in batch: - # Do the actual, time-consuming processing - if self.wait_while_suspended(): - return - process_fanart(plex_id, typus, self.refresh) - if len(batch) < BATCH_SIZE: - break - offset += BATCH_SIZE - else: - finished = True - finally: - LOG.info('FanartThread finished: %s', finished) - self.callback(finished) + while not finished: + finished = self._loop() + if self.wait_while_suspended(): + break + LOG.info('FanartThread finished: %s', finished) + self.callback(finished) class FanartTask(backgroundthread.Task): diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index e3f4f1eb..609d45b6 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -39,7 +39,7 @@ class InitNewSection(object): self.plex_type = plex_type -class FullSync(common.fullsync_mixin): +class FullSync(backgroundthread.KillableThread): def __init__(self, repair, callback, show_dialog): """ repair=True: force sync EVERY item @@ -75,6 +75,12 @@ class FullSync(common.fullsync_mixin): worker_count=self.worker_count) super(FullSync, self).__init__() + def suspend(self, block=False, timeout=None): + """ + Let's NOT suspend sync threads but immediately terminate them + """ + self.cancel() + def update_progressbar(self): if self.dialog: try: @@ -112,7 +118,7 @@ class FullSync(common.fullsync_mixin): if not self.section: _, self.section = self.queue.get() self.queue.task_done() - while not self.isCanceled() and self.item_count > 0: + while not self.should_cancel() and self.item_count > 0: section = self.section if not section: break @@ -124,12 +130,12 @@ class FullSync(common.fullsync_mixin): self.section_type_text = utils.lang( v.TRANSLATION_FROM_PLEXTYPE[section.plex_type]) with section.context(self.current_sync) as context: - while not self.isCanceled() and self.item_count > 0: + while not self.should_cancel() and self.item_count > 0: try: _, item = self.queue.get(block=False) except backgroundthread.Queue.Empty: if self.threader.threader.working(): - app.APP.monitor.waitForAbort(0.02) + self.sleep(0.02) continue else: # Try again, in case a thread just finished @@ -187,7 +193,7 @@ class FullSync(common.fullsync_mixin): # Check Plex DB to see what we need to add/update with PlexDB() as self.plexdb: for last, xml_item in loop: - if self.isCanceled(): + if self.should_cancel(): return False self.process_item(xml_item) if self.item_count == BATCH_SIZE: @@ -227,7 +233,7 @@ class FullSync(common.fullsync_mixin): while True: with section.context(self.current_sync) as itemtype: for i, (last, xml_item) in enumerate(loop): - if self.isCanceled(): + if self.should_cancel(): return False if not itemtype.update_userdata(xml_item, section.plex_type): # Somehow did not sync this item yet @@ -256,7 +262,7 @@ class FullSync(common.fullsync_mixin): for kind in kinds: for section in (x for x in app.SYNC.sections if x.section_type == kind[1]): - if self.isCanceled(): + if self.should_cancel(): LOG.debug('Need to exit now') return if not section.sync_to_kodi: @@ -332,7 +338,7 @@ class FullSync(common.fullsync_mixin): self.get_children = section.get_children self.queue = section.Queue() # Now do the heavy lifting - if self.isCanceled() or not self.addupdate_section(section): + if self.should_cancel() or not self.addupdate_section(section): return False if self.section_success: # Need to check because a thread might have missed to get @@ -391,7 +397,7 @@ class FullSync(common.fullsync_mixin): self.context = section.context self.get_children = section.get_children # Now do the heavy lifting - if self.isCanceled() or not self.playstate_per_section(section): + if self.should_cancel() or not self.playstate_per_section(section): return False # Delete movies that are not on Plex anymore @@ -416,7 +422,7 @@ class FullSync(common.fullsync_mixin): self.current_sync, BATCH_SIZE)) for plex_id in plex_ids: - if self.isCanceled(): + if self.should_cancel(): return False ctx.remove(plex_id, plex_type) if len(plex_ids) < BATCH_SIZE: @@ -436,7 +442,7 @@ class FullSync(common.fullsync_mixin): def _run(self): self.current_sync = timing.plex_now() # Get latest Plex libraries and build playlist and video node files - if self.isCanceled() or not sections.sync_from_pms(self): + if self.should_cancel() or not sections.sync_from_pms(self): return self.successful = True try: @@ -447,7 +453,7 @@ class FullSync(common.fullsync_mixin): # Actual syncing - do only new items first LOG.info('Running full_library_sync with repair=%s', self.repair) - if self.isCanceled() or not self.full_library_sync(): + if self.should_cancel() or not self.full_library_sync(): self.successful = False return finally: @@ -457,7 +463,7 @@ class FullSync(common.fullsync_mixin): if self.threader: self.threader.shutdown() self.threader = None - if not self.successful and not self.isCanceled(): + if not self.successful and not self.should_cancel(): # "ERROR in library sync" utils.dialog('notification', heading='{plex}', @@ -468,4 +474,5 @@ class FullSync(common.fullsync_mixin): def start(show_dialog, repair=False, callback=None): + # Call run() and NOT start in order to not spawn another thread FullSync(repair, callback, show_dialog).run() diff --git a/resources/lib/library_sync/get_metadata.py b/resources/lib/library_sync/get_metadata.py index 6cd653c2..427169e2 100644 --- a/resources/lib/library_sync/get_metadata.py +++ b/resources/lib/library_sync/get_metadata.py @@ -2,7 +2,6 @@ from __future__ import absolute_import, division, unicode_literals from logging import getLogger -from . import common from ..plex_api import API from .. import plex_functions as PF, backgroundthread, utils, variables as v @@ -27,7 +26,7 @@ def reset_collections(): COLLECTION_XMLS = {} -class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): +class GetMetadataTask(backgroundthread.Task): """ Threaded download of Plex XML metadata for a certain library item. Fills the queue with the downloaded etree XML objects @@ -45,6 +44,12 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): self.count = count super(GetMetadataTask, self).__init__() + def suspend(self, block=False, timeout=None): + """ + Let's NOT suspend sync threads but immediately terminate them + """ + self.cancel() + def _collections(self, item): global COLLECTION_MATCH, COLLECTION_XMLS api = API(item['xml'][0]) @@ -59,7 +64,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): utils.cast(int, x.get('ratingKey'))) for x in COLLECTION_MATCH] item['children'] = {} for plex_set_id, set_name in api.collections(): - if self.isCanceled(): + if self.should_cancel(): return if plex_set_id not in COLLECTION_XMLS: # Get Plex metadata for collections - a pain @@ -84,7 +89,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): """ Do the work """ - if self.isCanceled(): + if self.should_cancel(): return # Download Metadata item = { @@ -101,7 +106,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): 'Cancelling sync for now') utils.window('plex_scancrashed', value='401') return - if not self.isCanceled() and self.plex_type == v.PLEX_TYPE_MOVIE: + if not self.should_cancel() and self.plex_type == v.PLEX_TYPE_MOVIE: # Check for collections/sets collections = False for child in item['xml'][0]: @@ -112,7 +117,7 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): global LOCK with LOCK: self._collections(item) - if not self.isCanceled() and self.get_children: + if not self.should_cancel() and self.get_children: children_xml = PF.GetAllPlexChildren(self.plex_id) try: children_xml[0].attrib @@ -121,5 +126,5 @@ class GetMetadataTask(common.fullsync_mixin, backgroundthread.Task): self.plex_id) else: item['children'] = children_xml - if not self.isCanceled(): + if not self.should_cancel(): self.queue.put((self.count, item)) diff --git a/resources/lib/library_sync/sections.py b/resources/lib/library_sync/sections.py index a1b78f23..451673b3 100644 --- a/resources/lib/library_sync/sections.py +++ b/resources/lib/library_sync/sections.py @@ -16,7 +16,7 @@ LOG = getLogger('PLEX.sync.sections') BATCH_SIZE = 500 # Need a way to interrupt our synching process -IS_CANCELED = None +SHOULD_CANCEL = None LIBRARY_PATH = path_ops.translate_path('special://profile/library/video/') # The video library might not yet exist for this user - create it @@ -490,7 +490,7 @@ def _delete_kodi_db_items(section): with kodi_context(texture_db=True) as kodidb: typus = context(None, plexdb=plexdb, kodidb=kodidb) for plex_id in plex_ids: - if IS_CANCELED(): + if SHOULD_CANCEL(): return False typus.remove(plex_id) if len(plex_ids) < BATCH_SIZE: @@ -582,13 +582,13 @@ def sync_from_pms(parent_self, pick_libraries=False): pick_libraries=True will prompt the user the select the libraries he wants to sync """ - global IS_CANCELED + global SHOULD_CANCEL LOG.info('Starting synching sections from the PMS') - IS_CANCELED = parent_self.isCanceled + SHOULD_CANCEL = parent_self.should_cancel try: return _sync_from_pms(pick_libraries) finally: - IS_CANCELED = None + SHOULD_CANCEL = None LOG.info('Done synching sections from the PMS: %s', app.SYNC.sections) diff --git a/resources/lib/playlists/__init__.py b/resources/lib/playlists/__init__.py index b3658ba0..3553c655 100644 --- a/resources/lib/playlists/__init__.py +++ b/resources/lib/playlists/__init__.py @@ -38,7 +38,7 @@ SUPPORTED_FILETYPES = ( ############################################################################### -def isCanceled(): +def should_cancel(): return app.APP.stop_pkc or app.SYNC.stop_sync @@ -167,7 +167,7 @@ def _full_sync(): # before. If yes, make sure that hashes are identical. If not, sync it. old_plex_ids = db.plex_playlist_ids() for xml_playlist in xml: - if isCanceled(): + if should_cancel(): return False api = API(xml_playlist) try: @@ -199,7 +199,7 @@ def _full_sync(): LOG.info('Could not recreate playlist %s', api.plex_id) # Get rid of old Plex playlists that were deleted on the Plex side for plex_id in old_plex_ids: - if isCanceled(): + if should_cancel(): return False playlist = db.get_playlist(plex_id=plex_id) LOG.debug('Removing outdated Plex playlist from Kodi: %s', playlist) @@ -213,7 +213,7 @@ def _full_sync(): old_kodi_paths = db.kodi_playlist_paths() for root, _, files in path_ops.walk(v.PLAYLIST_PATH): for f in files: - if isCanceled(): + if should_cancel(): return False path = path_ops.path.join(root, f) try: @@ -244,7 +244,7 @@ def _full_sync(): except PlaylistError: LOG.info('Skipping Kodi playlist %s', path) for kodi_path in old_kodi_paths: - if isCanceled(): + if should_cancel(): return False playlist = db.get_playlist(path=kodi_path) if not playlist: diff --git a/resources/lib/playqueue.py b/resources/lib/playqueue.py index 7e78bee9..45fb1d50 100644 --- a/resources/lib/playqueue.py +++ b/resources/lib/playqueue.py @@ -120,7 +120,7 @@ class PlayqueueMonitor(backgroundthread.KillableThread): # Ignore new media added by other addons continue for j, old_item in enumerate(old): - if self.isCanceled(): + if self.should_suspend() or self.should_cancel(): # Chances are that we got an empty Kodi playlist due to # Kodi exit return @@ -189,7 +189,7 @@ class PlayqueueMonitor(backgroundthread.KillableThread): for j in range(i, len(index)): index[j] += 1 for i in reversed(index): - if self.isCanceled(): + if self.should_suspend() or self.should_cancel(): # Chances are that we got an empty Kodi playlist due to # Kodi exit return @@ -212,9 +212,10 @@ class PlayqueueMonitor(backgroundthread.KillableThread): LOG.info("----===## PlayqueueMonitor stopped ##===----") def _run(self): - while not self.isCanceled(): - if self.wait_while_suspended(): - return + while not self.should_cancel(): + if self.should_suspend(): + if self.wait_while_suspended(): + return with app.APP.lock_playqueues: for playqueue in PLAYQUEUES: kodi_pl = js.playlist_get_items(playqueue.playlistid) @@ -228,4 +229,4 @@ class PlayqueueMonitor(backgroundthread.KillableThread): # compare old and new playqueue self._compare_playqueues(playqueue, kodi_pl) playqueue.old_kodi_pl = list(kodi_pl) - app.APP.monitor.waitForAbort(0.2) + self.sleep(0.2) diff --git a/resources/lib/plex_companion.py b/resources/lib/plex_companion.py index 036ac864..53a73c4c 100644 --- a/resources/lib/plex_companion.py +++ b/resources/lib/plex_companion.py @@ -312,12 +312,13 @@ class PlexCompanion(backgroundthread.KillableThread): if httpd: thread = Thread(target=httpd.handle_request) - while not self.isCanceled(): + while not self.should_cancel(): # If we are not authorized, sleep # Otherwise, we trigger a download which leads to a # re-authorizations - if self.wait_while_suspended(): - break + if self.should_suspend(): + if self.wait_while_suspended(): + break try: message_count += 1 if httpd: @@ -356,6 +357,6 @@ class PlexCompanion(backgroundthread.KillableThread): app.APP.companion_queue.task_done() # Don't sleep continue - app.APP.monitor.waitForAbort(0.05) + self.sleep(0.05) subscription_manager.signal_stop() client.stop_all() diff --git a/resources/lib/service_entry.py b/resources/lib/service_entry.py index b7b10aed..6d0ac9df 100644 --- a/resources/lib/service_entry.py +++ b/resources/lib/service_entry.py @@ -101,7 +101,7 @@ class Service(object): self._init_done = True @staticmethod - def isCanceled(): + def should_cancel(): return xbmc.abortRequested or app.APP.stop_pkc def on_connection_check(self, result): @@ -437,7 +437,7 @@ class Service(object): self.playqueue = playqueue.PlayqueueMonitor() # Main PKC program loop - while not self.isCanceled(): + while not self.should_cancel(): # Check for PKC commands from other Python instances plex_command = utils.window('plexkodiconnect.command') diff --git a/resources/lib/sync.py b/resources/lib/sync.py index 87efc477..6ed3eae7 100644 --- a/resources/lib/sync.py +++ b/resources/lib/sync.py @@ -38,7 +38,9 @@ class Sync(backgroundthread.KillableThread): self.start_library_sync(show_dialog=True, repair=app.SYNC.run_lib_scan == 'repair', block=True) - if not self.sync_successful and not self.isSuspended() and not self.isCanceled(): + if (not self.sync_successful and + not self.should_suspend() and + not self.should_cancel()): # ERROR in library sync LOG.warn('Triggered full/repair sync has not been successful') elif app.SYNC.run_lib_scan == 'fanart': @@ -112,7 +114,7 @@ class Sync(backgroundthread.KillableThread): LOG.info('Not synching Plex artwork - not caching') return if self.image_cache_thread and self.image_cache_thread.is_alive(): - self.image_cache_thread.abort() + self.image_cache_thread.cancel() self.image_cache_thread.join() self.image_cache_thread = artwork.ImageCachingThread() self.image_cache_thread.start() @@ -163,10 +165,11 @@ class Sync(backgroundthread.KillableThread): utils.init_dbs() - while not self.isCanceled(): + while not self.should_cancel(): # In the event the server goes offline - if self.wait_while_suspended(): - return + if self.should_suspend(): + if self.wait_while_suspended(): + return if not install_sync_done: # Very FIRST sync ever upon installation or reset of Kodi DB LOG.info('Initial start-up full sync starting') @@ -188,7 +191,7 @@ class Sync(backgroundthread.KillableThread): self.start_image_cache_thread() else: LOG.error('Initial start-up full sync unsuccessful') - app.APP.monitor.waitForAbort(1) + self.sleep(1) xbmc.executebuiltin('InhibitIdleShutdown(false)') elif not initial_sync_done: @@ -205,7 +208,7 @@ class Sync(backgroundthread.KillableThread): self.start_image_cache_thread() else: LOG.info('Startup sync has not yet been successful') - app.APP.monitor.waitForAbort(1) + self.sleep(1) # Currently no db scan, so we could start a new scan else: @@ -240,9 +243,9 @@ class Sync(backgroundthread.KillableThread): library_sync.store_websocket_message(message) queue.task_done() # Sleep just a bit - app.APP.monitor.waitForAbort(0.01) + self.sleep(0.01) continue - app.APP.monitor.waitForAbort(0.1) + self.sleep(0.1) # Shut down playlist monitoring if playlist_monitor: playlist_monitor.stop() diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index e999e669..8ff767b1 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -19,7 +19,7 @@ class WebSocket(backgroundthread.KillableThread): def __init__(self): self.ws = None self.redirect_uri = None - self.sleeptime = 0 + self.sleeptime = 0.0 super(WebSocket, self).__init__() def process(self, opcode, message): @@ -46,15 +46,15 @@ class WebSocket(backgroundthread.KillableThread): def getUri(self): raise NotImplementedError - def __sleep(self): + def _sleep_cycle(self): """ Sleeps for 2^self.sleeptime where sleeping period will be doubled with each unsuccessful connection attempt. Will sleep at most 64 seconds """ - app.APP.monitor.waitForAbort(2**self.sleeptime) + self.sleep(2 ** self.sleeptime) if self.sleeptime < 6: - self.sleeptime += 1 + self.sleeptime += 1.0 def run(self): LOG.info("----===## Starting %s ##===----", self.__class__.__name__) @@ -69,9 +69,9 @@ class WebSocket(backgroundthread.KillableThread): LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__) def _run(self): - while not self.isCanceled(): + while not self.should_cancel(): # In the event the server goes offline - if self.isSuspended(): + if self.should_suspend(): # Set in service.py if self.ws is not None: self.ws.close() @@ -99,11 +99,11 @@ class WebSocket(backgroundthread.KillableThread): # Server is probably offline LOG.debug("%s: IOError connecting", self.__class__.__name__) self.ws = None - self.__sleep() + self._sleep_cycle() except websocket.WebSocketTimeoutException: LOG.debug("%s: WebSocketTimeoutException", self.__class__.__name__) self.ws = None - self.__sleep() + self._sleep_cycle() except websocket.WebsocketRedirect as e: LOG.debug('301 redirect detected: %s', e) self.redirect_uri = e.headers.get('location', @@ -111,11 +111,11 @@ class WebSocket(backgroundthread.KillableThread): if self.redirect_uri: self.redirect_uri = self.redirect_uri.decode('utf-8') self.ws = None - self.__sleep() + self._sleep_cycle() except websocket.WebSocketException as e: LOG.debug('%s: WebSocketException: %s', self.__class__.__name__, e) self.ws = None - self.__sleep() + self._sleep_cycle() except Exception as e: LOG.error('%s: Unknown exception encountered when ' 'connecting: %s', self.__class__.__name__, e) @@ -123,9 +123,9 @@ class WebSocket(backgroundthread.KillableThread): LOG.error("%s: Traceback:\n%s", self.__class__.__name__, traceback.format_exc()) self.ws = None - self.__sleep() + self._sleep_cycle() else: - self.sleeptime = 0 + self.sleeptime = 0.0 except Exception as e: LOG.error("%s: Unknown exception encountered: %s", self.__class__.__name__, e) @@ -141,7 +141,7 @@ class PMS_Websocket(WebSocket): """ Websocket connection with the PMS for Plex Companion """ - def isSuspended(self): + def should_suspend(self): """ Returns True if the thread is suspended """ @@ -206,7 +206,7 @@ class Alexa_Websocket(WebSocket): """ Websocket connection to talk to Amazon Alexa. """ - def isSuspended(self): + def should_suspend(self): """ Overwrite method since we need to check for plex token """ diff --git a/resources/lib/windows/userselect.py b/resources/lib/windows/userselect.py index d68ac1ad..84427741 100644 --- a/resources/lib/windows/userselect.py +++ b/resources/lib/windows/userselect.py @@ -24,7 +24,7 @@ class UserThumbTask(backgroundthread.Task): def run(self): for user in self.users: - if self.isCanceled(): + if self.should_cancel(): return thumb, back = user.thumb, '' self.callback(user, thumb, back)