From 1787e51c7cc821ccd49d206c5a0d4ca3eeda4168 Mon Sep 17 00:00:00 2001 From: croneter Date: Wed, 30 Jan 2019 20:36:52 +0100 Subject: [PATCH] Zentrally register threads and introduce a way to wait for their suspension --- resources/lib/app/account.py | 14 ++- resources/lib/app/application.py | 140 ++++++++++++++++++++++-- resources/lib/app/connection.py | 18 ++- resources/lib/app/libsync.py | 79 ++++++++----- resources/lib/artwork.py | 20 ++-- resources/lib/backgroundthread.py | 29 ++++- resources/lib/kodimonitor.py | 5 +- resources/lib/library_sync/common.py | 36 ++++-- resources/lib/library_sync/fanart.py | 29 ++--- resources/lib/library_sync/full_sync.py | 16 ++- resources/lib/library_sync/sections.py | 19 +++- resources/lib/library_sync/websocket.py | 7 -- resources/lib/playback.py | 2 +- resources/lib/playlists/__init__.py | 2 +- resources/lib/playqueue.py | 21 ++-- resources/lib/plex_api.py | 2 +- resources/lib/plex_companion.py | 16 +-- resources/lib/plexbmchelper/listener.py | 2 +- resources/lib/service_entry.py | 76 ++++++------- resources/lib/sync.py | 70 +++--------- resources/lib/utils.py | 16 +-- resources/lib/websocket_client.py | 16 +-- 22 files changed, 381 insertions(+), 254 deletions(-) diff --git a/resources/lib/app/account.py b/resources/lib/app/account.py index 112df454..3a8ffb7f 100644 --- a/resources/lib/app/account.py +++ b/resources/lib/app/account.py @@ -10,12 +10,22 @@ LOG = getLogger('PLEX.account') class Account(object): def __init__(self, entrypoint=False): + self.plex_login = None + self.plex_login_id = None + self.plex_username = None + self.plex_user_id = None + self.plex_token = None + self.pms_token = None + self.avatar = None + self.myplexlogin = None + self.restricted_user = None + self.force_login = None + self._session = None + self.authenticated = False if entrypoint: self.load_entrypoint() else: - self.authenticated = False utils.window('plex_authenticated', clear=True) - self._session = None self.load() def set_authenticated(self): diff --git a/resources/lib/app/application.py b/resources/lib/app/application.py index c6226f51..dd850a53 100644 --- a/resources/lib/app/application.py +++ b/resources/lib/app/application.py @@ -1,27 +1,32 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from __future__ import absolute_import, division, unicode_literals +from logging import getLogger import Queue from threading import Lock, RLock +import xbmc + from .. import utils +LOG = getLogger('PLEX.app') + class App(object): """ This class is used to store variables across PKC modules """ def __init__(self, entrypoint=False): + self.fetch_pms_item_number = None + self.force_reload_skin = None if entrypoint: self.load_entrypoint() else: self.load() # Quit PKC? self.stop_pkc = False - # Shall we completely suspend PKC and our threads? + # This will suspend the main thread also self.suspend = False - # Shall we only suspend threads? - self._suspend_threads = False # Need to lock all methods and functions messing with Plex Companion subscribers self.lock_subscriber = RLock() # Need to lock everything messing with Kodi/PKC playqueues @@ -38,6 +43,127 @@ class App(object): self.monitor = None # xbmc.Player() instance self.player = None + # All thread instances + self.threads = [] + # Instance of FanartThread() + self.fanart_thread = None + # Instance of ImageCachingThread() + self.caching_thread = None + + @property + def is_playing(self): + return self.player.isPlaying() + + @property + def is_playing_video(self): + return self.player.isPlayingVideo() + + def register_fanart_thread(self, thread): + self.fanart_thread = thread + self.threads.append(thread) + + def deregister_fanart_thread(self, thread): + self.fanart_thread = None + self.threads.remove(thread) + + def suspend_fanart_thread(self, block=True): + try: + self.fanart_thread.suspend(block=block) + except AttributeError: + pass + + def resume_fanart_thread(self): + try: + self.fanart_thread.resume() + except AttributeError: + pass + + def register_caching_thread(self, thread): + self.caching_thread = thread + self.threads.append(thread) + + def deregister_caching_thread(self, thread): + self.caching_thread = None + self.threads.remove(thread) + + def suspend_caching_thread(self, block=True): + try: + self.caching_thread.suspend(block=block) + except AttributeError: + pass + + def resume_caching_thread(self): + try: + self.caching_thread.resume() + except AttributeError: + pass + + def register_thread(self, thread): + """ + Hit with thread [backgroundthread.Killablethread instance] to register + any and all threads + """ + self.threads.append(thread) + + def deregister_thread(self, thread): + """ + Sync thread has done it's work and is e.g. about to die + """ + self.threads.remove(thread) + + def suspend_threads(self, block=True): + """ + Suspend all threads' activity with or without blocking. + Returns True only if PKC shutdown requested + """ + LOG.debug('Suspending threads: %s', self.threads) + for thread in self.threads: + thread.suspend() + if block: + while True: + for thread in self.threads: + if not thread.suspend_reached: + LOG.debug('Waiting for thread to suspend: %s', thread) + if self.monitor.waitForAbort(0.1): + return True + break + else: + break + return xbmc.abortRequested + + def resume_threads(self, block=True): + """ + Resume all thread activity with or without blocking. + Returns True only if PKC shutdown requested + """ + LOG.debug('Resuming threads: %s', self.threads) + for thread in self.threads: + thread.resume() + if block: + while True: + for thread in self.threads: + if thread.suspend_reached: + LOG.debug('Waiting for thread to resume: %s', thread) + if self.monitor.waitForAbort(0.1): + return True + break + else: + break + return xbmc.abortRequested + + def stop_threads(self, block=True): + """ + Stop all threads. Will block until all threads are stopped + Will NOT quit if PKC should exit! + """ + LOG.debug('Killing threads: %s', self.threads) + for thread in self.threads: + thread.abort() + if block: + while self.threads: + LOG.debug('Waiting for threads to exit: %s', self.threads) + if xbmc.sleep(100): + return True def load(self): # Number of items to fetch and display in widgets @@ -48,11 +174,3 @@ class App(object): def load_entrypoint(self): self.fetch_pms_item_number = int(utils.settings('fetch_pms_item_number')) - - @property - def suspend_threads(self): - return self._suspend_threads or self.suspend - - @suspend_threads.setter - def suspend_threads(self, value): - self._suspend_threads = value diff --git a/resources/lib/app/connection.py b/resources/lib/app/connection.py index 2adcfb34..56ad192c 100644 --- a/resources/lib/app/connection.py +++ b/resources/lib/app/connection.py @@ -10,13 +10,25 @@ LOG = getLogger('PLEX.connection') class Connection(object): def __init__(self, entrypoint=False): + self.verify_ssl_cert = None + self.ssl_cert_path = None + self.machine_identifier = None + self.server_name = None + self.https = None + self.host = None + self.port = None + self.server = None + self.online = False + self.webserver_host = None + self.webserver_port = None + self.webserver_username = None + self.webserver_password = None + if entrypoint: self.load_entrypoint() else: self.load_webserver() self.load() - # TODO: Delete - self.pms_server = None # Token passed along, e.g. if playback initiated by Plex Companion. Might be # another user playing something! Token identifies user self.plex_transient_token = None @@ -57,7 +69,6 @@ class Connection(object): self.server = 'https://%s:%s' % (self.host, self.port) else: self.server = 'http://%s:%s' % (self.host, self.port) - utils.window('pms_server', value=self.server) self.online = False LOG.debug('Set server %s (%s) to %s', self.server_name, self.machine_identifier, self.server) @@ -85,4 +96,3 @@ class Connection(object): self.host = None self.port = None self.server = None - utils.window('pms_server', clear=True) diff --git a/resources/lib/app/libsync.py b/resources/lib/app/libsync.py index a994dadc..7bb7f82c 100644 --- a/resources/lib/app/libsync.py +++ b/resources/lib/app/libsync.py @@ -19,34 +19,71 @@ def remove_trailing_slash(path): class Sync(object): def __init__(self, entrypoint=False): - self.load() + # Direct Paths (True) or Addon Paths (False)? + self.direct_paths = None + # Is synching of Plex music enabled? + self.enable_music = None + # Do we sync artwork from the PMS to Kodi? + self.artwork = None + # Path remapping mechanism (e.g. smb paths) + # Do we replace \\myserver\path to smb://myserver/path? + self.replace_smb_path = None + # Do we generally remap? + self.remap_path = None + self.force_transcode_pix = None + # Mappings for REMAP_PATH: + self.remapSMBmovieOrg = None + self.remapSMBmovieNew = None + self.remapSMBtvOrg = None + self.remapSMBtvNew = None + self.remapSMBmusicOrg = None + self.remapSMBmusicNew = None + self.remapSMBphotoOrg = None + self.remapSMBphotoNew = None + # Escape path? + self.escape_path = None + # Shall we replace custom user ratings with the number of versions available? + self.indicate_media_versions = None + # Will sync movie trailer differently: either play trailer directly or show + # all the Plex extras for the user to choose + self.show_extras_instead_of_playing_trailer = None + # Only sync specific Plex playlists to Kodi? + self.sync_specific_plex_playlists = None + # Only sync specific Kodi playlists to Plex? + self.sync_specific_kodi_playlists = None + # Shall we show Kodi dialogs when synching? + self.sync_dialog = None + + # How often shall we sync? + self.full_sync_intervall = None + # Background Sync disabled? + self.background_sync_disabled = None + # How long shall we wait with synching a new item to make sure Plex got all + # metadata? + self.backgroundsync_saftymargin = None + # How many threads to download Plex metadata on sync? + self.sync_thread_number = None + + # Shall Kodi show dialogs for syncing/caching images? (e.g. images left + # to sync) + self.image_sync_notifications = None + # Do we need to run a special library scan? self.run_lib_scan = None # Set if user decided to cancel sync self.stop_sync = False - # Set during media playback if PKC should not do any syncs. Will NOT - # suspend synching of playstate progress - self.suspend_sync = False # Could we access the paths? self.path_verified = False - # Set if a Plex-Kodi DB sync is being done - along with - # window('plex_dbScan') set to 'true' - self.db_scan = False + + self.load() def load(self): - # Direct Paths (True) or Addon Paths (False)? self.direct_paths = utils.settings('useDirectPaths') == '1' - # Is synching of Plex music enabled? self.enable_music = utils.settings('enableMusic') == 'true' - # Do we sync artwork from the PMS to Kodi? self.artwork = utils.settings('usePlexArtwork') == 'true' - # Path remapping mechanism (e.g. smb paths) - # Do we replace \\myserver\path to smb://myserver/path? self.replace_smb_path = utils.settings('replaceSMB') == 'true' - # Do we generally remap? self.remap_path = utils.settings('remapSMB') == 'true' self.force_transcode_pix = utils.settings('force_transcode_pix') == 'true' - # Mappings for REMAP_PATH: self.remapSMBmovieOrg = remove_trailing_slash(utils.settings('remapSMBmovieOrg')) self.remapSMBmovieNew = remove_trailing_slash(utils.settings('remapSMBmovieNew')) self.remapSMBtvOrg = remove_trailing_slash(utils.settings('remapSMBtvOrg')) @@ -55,30 +92,16 @@ class Sync(object): self.remapSMBmusicNew = remove_trailing_slash(utils.settings('remapSMBmusicNew')) self.remapSMBphotoOrg = remove_trailing_slash(utils.settings('remapSMBphotoOrg')) self.remapSMBphotoNew = remove_trailing_slash(utils.settings('remapSMBphotoNew')) - # Escape path? self.escape_path = utils.settings('escapePath') == 'true' - # Shall we replace custom user ratings with the number of versions available? self.indicate_media_versions = utils.settings('indicate_media_versions') == "true" - # Will sync movie trailer differently: either play trailer directly or show - # all the Plex extras for the user to choose self.show_extras_instead_of_playing_trailer = utils.settings('showExtrasInsteadOfTrailer') == 'true' - # Only sync specific Plex playlists to Kodi? self.sync_specific_plex_playlists = utils.settings('syncSpecificPlexPlaylists') == 'true' - # Only sync specific Kodi playlists to Plex? self.sync_specific_kodi_playlists = utils.settings('syncSpecificKodiPlaylists') == 'true' - # Shall we show Kodi dialogs when synching? self.sync_dialog = utils.settings('dbSyncIndicator') == 'true' - # How often shall we sync? self.full_sync_intervall = int(utils.settings('fullSyncInterval')) * 60 - # Background Sync disabled? self.background_sync_disabled = utils.settings('enableBackgroundSync') == 'false' - # How long shall we wait with synching a new item to make sure Plex got all - # metadata? self.backgroundsync_saftymargin = int(utils.settings('backgroundsync_saftyMargin')) - # How many threads to download Plex metadata on sync? self.sync_thread_number = int(utils.settings('syncThreadNumber')) - # Shall Kodi show dialogs for syncing/caching images? (e.g. images left - # to sync) self.image_sync_notifications = utils.settings('imageSyncNotifications') == 'true' diff --git a/resources/lib/artwork.py b/resources/lib/artwork.py index 555cb70c..8c20127b 100644 --- a/resources/lib/artwork.py +++ b/resources/lib/artwork.py @@ -18,8 +18,6 @@ requests.packages.urllib3.disable_warnings() TIMEOUT = (35.1, 35.1) BATCH_SIZE = 500 -IMAGE_CACHING_SUSPENDS = [] - def double_urlencode(text): return quote_plus(quote_plus(text)) @@ -30,8 +28,14 @@ def double_urldecode(text): class ImageCachingThread(backgroundthread.KillableThread): + def __init__(self): + super(ImageCachingThread, self).__init__() + self.suspend_points = [self._suspended] + if not utils.settings('imageSyncDuringPlayback') == 'true': + self.suspend_points.append(app.APP.is_playing_video) + def isSuspended(self): - return any(IMAGE_CACHING_SUSPENDS) + return any(self.suspend_points) def _url_generator(self, kind, kodi_type): """ @@ -60,11 +64,13 @@ class ImageCachingThread(backgroundthread.KillableThread): def run(self): LOG.info("---===### Starting ImageCachingThread ###===---") + app.APP.register_caching_thread(self) try: self._run() except Exception: utils.ERROR() finally: + app.APP.deregister_caching_thread(self) LOG.info("---===### Stopped ImageCachingThread ###===---") def _run(self): @@ -74,14 +80,8 @@ class ImageCachingThread(backgroundthread.KillableThread): for kind in kinds: for kodi_type in ('poster', 'fanart'): for url in self._url_generator(kind, kodi_type): - if self.isCanceled(): + if self.wait_while_suspended(): return - while self.isSuspended(): - # Set in service.py - if self.isCanceled(): - # Abort was requested while waiting. We should exit - return - app.APP.monitor.waitForAbort(1) cache_url(url) # Toggles Image caching completed to Yes utils.settings('plex_status_image_caching', value=utils.lang(107)) diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 7e9ac465..18bb6722 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -77,7 +77,10 @@ class KillableThread(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs={}): self._canceled = False + # Set to True to set the thread to suspended self._suspended = False + # Thread will return True only if suspended state is reached + self.suspend_reached = False super(KillableThread, self).__init__(group, target, name, args, kwargs) def isCanceled(self): @@ -94,11 +97,16 @@ class KillableThread(threading.Thread): """ self._canceled = True - def suspend(self): + def suspend(self, block=False): """ Call to suspend this thread """ self._suspended = True + if block: + while not self.suspend_reached: + LOG.debug('Waiting for thread to suspend: %s', self) + if app.APP.monitor.waitForAbort(0.1): + return def resume(self): """ @@ -106,6 +114,25 @@ class KillableThread(threading.Thread): """ self._suspended = False + def wait_while_suspended(self): + """ + Blocks until thread is not suspended anymore or the thread should + exit. + Returns True only if the thread should exit (=isCanceled()) + """ + while self.isSuspended(): + try: + self.suspend_reached = True + # Set in service.py + if self.isCanceled(): + # Abort was requested while waiting. We should exit + return True + if app.APP.monitor.waitForAbort(0.1): + return True + finally: + self.suspend_reached = False + return self.isCanceled() + def isSuspended(self): """ Returns True if the thread is suspended diff --git a/resources/lib/kodimonitor.py b/resources/lib/kodimonitor.py index f28020b2..1d1b69f2 100644 --- a/resources/lib/kodimonitor.py +++ b/resources/lib/kodimonitor.py @@ -54,7 +54,8 @@ class KodiMonitor(xbmc.Monitor): """ LOG.debug('PKC settings change detected') # Assume that the user changed something so we can try to reconnect - app.APP.suspend = False + # app.APP.suspend = False + # app.APP.resume_threads(block=False) def onNotification(self, sender, method, data): """ @@ -69,7 +70,6 @@ class KodiMonitor(xbmc.Monitor): self.hack_replay = None if method == "Player.OnPlay": - app.SYNC.suspend_sync = True with app.APP.lock_playqueues: self.PlayBackStart(data) elif method == "Player.OnStop": @@ -87,7 +87,6 @@ class KodiMonitor(xbmc.Monitor): else: with app.APP.lock_playqueues: _playback_cleanup() - app.SYNC.suspend_sync = False elif method == 'Playlist.OnAdd': if 'item' in data and data['item'].get('type') == v.KODI_TYPE_SHOW: # Hitting the "browse" button on tv show info dialog diff --git a/resources/lib/library_sync/common.py b/resources/lib/library_sync/common.py index 3748c754..44368424 100644 --- a/resources/lib/library_sync/common.py +++ b/resources/lib/library_sync/common.py @@ -3,24 +3,38 @@ from __future__ import absolute_import, division, unicode_literals import xbmc -from .. import app, utils, variables as v +from .. import utils, variables as v PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and utils.settings('enablePlaylistSync') == 'true') -class libsync_mixin(object): - def isCanceled(self): - return (self._canceled or app.APP.stop_pkc or app.SYNC.stop_sync or - app.APP.suspend_threads or app.SYNC.suspend_sync) - - class fullsync_mixin(object): + def __init__(self): + self._canceled = False + + def abort(self): + """Hit method to terminate the thread""" + self._canceled = True + # Let's NOT suspend sync threads but immediately terminate them + suspend = abort + + @property + def suspend_reached(self): + """Since we're not suspending, we'll never set it to True""" + return False + + @suspend_reached.setter + def suspend_reached(self): + pass + + def resume(self): + """Obsolete since we're not suspending""" + pass + def isCanceled(self): - return (self._canceled or - app.APP.stop_pkc or - app.SYNC.stop_sync or - app.APP.suspend_threads) + """Check whether we should exit this thread""" + return self._canceled def update_kodi_library(video=True, music=True): diff --git a/resources/lib/library_sync/fanart.py b/resources/lib/library_sync/fanart.py index c9f9ff9a..f14a035b 100644 --- a/resources/lib/library_sync/fanart.py +++ b/resources/lib/library_sync/fanart.py @@ -2,7 +2,6 @@ from __future__ import absolute_import, division, unicode_literals from logging import getLogger -from . import common from ..plex_api import API from ..plex_db import PlexDB from ..kodi_db import KodiVideoDB @@ -19,13 +18,6 @@ PREFER_KODI_COLLECTION_ART = utils.settings('PreferKodiCollectionArt') == 'false BATCH_SIZE = 500 -def suspends(): - return (app.APP.suspend_threads or - app.SYNC.stop_sync or - app.SYNC.db_scan or - app.SYNC.suspend_sync) - - class FanartThread(backgroundthread.KillableThread): """ This will potentially take hours! @@ -36,16 +28,19 @@ class FanartThread(backgroundthread.KillableThread): super(FanartThread, self).__init__() def isSuspended(self): - return suspends() + return self._suspended or app.APP.is_playing_video def run(self): + LOG.info('Starting FanartThread') + app.APP.register_fanart_thread(self) try: self._run_internal() except Exception: utils.ERROR(notify=True) + finally: + app.APP.deregister_fanart_thread(self) def _run_internal(self): - LOG.info('Starting FanartThread') finished = False try: for typus in SUPPORTED_TYPES: @@ -63,12 +58,8 @@ class FanartThread(backgroundthread.KillableThread): BATCH_SIZE)) for plex_id in batch: # Do the actual, time-consuming processing - if self.isCanceled(): + if self.wait_while_suspended(): return - if self.isSuspended(): - if self.isCanceled(): - return - app.APP.monitor.waitForAbort(1) process_fanart(plex_id, typus, self.refresh) if len(batch) < BATCH_SIZE: break @@ -80,7 +71,7 @@ class FanartThread(backgroundthread.KillableThread): self.callback(finished) -class FanartTask(common.libsync_mixin, backgroundthread.Task): +class FanartTask(backgroundthread.Task): """ This task will also be executed while library sync is suspended! """ @@ -154,11 +145,7 @@ def process_fanart(plex_id, plex_type, refresh=False): setid, v.KODI_TYPE_SET) done = True - except utils.OperationalError: - # Caused if we reset the Plex database and this function has not yet - # returned - pass finally: - if done is True and not suspends(): + if done is True: with PlexDB() as plexdb: plexdb.set_fanart_synced(plex_id, plex_type) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 8a16c530..d85fc291 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -44,7 +44,6 @@ class FullSync(common.fullsync_mixin): """ repair=True: force sync EVERY item """ - self._canceled = False self.repair = repair self.callback = callback self.queue = None @@ -85,7 +84,7 @@ class FullSync(common.fullsync_mixin): '%s (%s)' % (self.section_name, self.section_type_text), '%s %s/%s' % (self.title, self.current, self.total)) - if app.APP.player.isPlayingVideo(): + if app.APP.is_playing_video: self.dialog.close() self.dialog = None @@ -394,14 +393,22 @@ class FullSync(common.fullsync_mixin): LOG.debug('Done deleting') return True - @utils.log_time def run(self): + app.APP.register_thread(self) + try: + self._run() + finally: + app.APP.deregister_thread(self) + LOG.info('Done full_sync') + + @utils.log_time + def _run(self): self.current_sync = timing.plex_now() # Delete playlist and video node files from Kodi utils.delete_playlists() utils.delete_nodes() # Get latest Plex libraries and build playlist and video node files - if not sections.sync_from_pms(): + if not sections.sync_from_pms(self): return self.successful = True try: @@ -436,7 +443,6 @@ class FullSync(common.fullsync_mixin): icon='{error}') if self.callback: self.callback(self.successful) - LOG.info('Done full_sync') def start(show_dialog, repair=False, callback=None): diff --git a/resources/lib/library_sync/sections.py b/resources/lib/library_sync/sections.py index 17f9060b..77517cbf 100644 --- a/resources/lib/library_sync/sections.py +++ b/resources/lib/library_sync/sections.py @@ -18,16 +18,23 @@ VNODES = videonodes.VideoNodes() PLAYLISTS = {} NODES = {} SECTIONS = [] +# Need a way to interrupt +IS_CANCELED = None -def isCanceled(): - return app.APP.stop_pkc or app.APP.suspend_threads or app.SYNC.stop_sync - - -def sync_from_pms(): +def sync_from_pms(parent_self): """ Sync the Plex library sections """ + global IS_CANCELED + IS_CANCELED = parent_self.isCanceled + try: + return _sync_from_pms() + finally: + IS_CANCELED = None + + +def _sync_from_pms(): sections = PF.get_plex_sections() try: sections.attrib @@ -226,7 +233,7 @@ def _delete_kodi_db_items(section_id, section_type): with kodi_context(texture_db=True) as kodidb: typus = context(None, plexdb=plexdb, kodidb=kodidb) for plex_id in plex_ids: - if isCanceled(): + if IS_CANCELED(): return False typus.remove(plex_id) if len(plex_ids) < BATCH_SIZE: diff --git a/resources/lib/library_sync/websocket.py b/resources/lib/library_sync/websocket.py index 7b255667..c74ac6db 100644 --- a/resources/lib/library_sync/websocket.py +++ b/resources/lib/library_sync/websocket.py @@ -23,10 +23,6 @@ WEBSOCKET_MESSAGES = [] PLAYSTATE_SESSIONS = {} -def interrupt_processing(): - return app.APP.stop_pkc or app.APP.suspend_threads or app.SYNC.stop_sync - - def multi_delete(input_list, delete_list): """ Deletes the list items of input_list at the positions in delete_list @@ -81,9 +77,6 @@ def process_websocket_messages(): update_kodi_video_library, update_kodi_music_library = False, False delete_list = [] for i, message in enumerate(WEBSOCKET_MESSAGES): - if interrupt_processing(): - # Chances are that Kodi gets shut down - break if message['state'] == 9: successful, video, music = process_delete_message(message) elif now - message['timestamp'] < app.SYNC.backgroundsync_saftymargin: diff --git a/resources/lib/playback.py b/resources/lib/playback.py index 0c2f5582..2c35ab78 100644 --- a/resources/lib/playback.py +++ b/resources/lib/playback.py @@ -533,7 +533,7 @@ def threaded_playback(kodi_playlist, startpos, offset): app.APP.player.play(kodi_playlist, None, False, startpos) if offset and offset != '0': i = 0 - while not app.APP.player.isPlaying(): + while not app.APP.is_playing: app.APP.monitor.waitForAbort(0.1) i += 1 if i > 100: diff --git a/resources/lib/playlists/__init__.py b/resources/lib/playlists/__init__.py index 59f650f6..9d35125c 100644 --- a/resources/lib/playlists/__init__.py +++ b/resources/lib/playlists/__init__.py @@ -43,7 +43,7 @@ IGNORE_PLEX_PLAYLIST_CHANGE = list() def isCanceled(): - return app.APP.stop_pkc or app.SYNC.stop_sync or app.APP.suspend_threads + return app.APP.stop_pkc or app.SYNC.stop_sync def kodi_playlist_monitor(): diff --git a/resources/lib/playqueue.py b/resources/lib/playqueue.py index 26daec86..3d35a983 100644 --- a/resources/lib/playqueue.py +++ b/resources/lib/playqueue.py @@ -97,12 +97,6 @@ class PlayqueueMonitor(backgroundthread.KillableThread): (playlist) are swapped. This is what this monitor is for. Don't replace this mechanism till Kodi's implementation of playlists has improved """ - def isSuspended(self): - """ - Returns True if the thread is suspended - """ - return self._suspended or app.APP.suspend_threads - def _compare_playqueues(self, playqueue, new): """ Used to poll the Kodi playqueue and update the Plex playqueue if needed @@ -193,11 +187,17 @@ class PlayqueueMonitor(backgroundthread.KillableThread): def run(self): LOG.info("----===## Starting PlayqueueMonitor ##===----") + app.APP.register_thread(self) + try: + self._run() + finally: + app.APP.deregister_thread(self) + LOG.info("----===## PlayqueueMonitor stopped ##===----") + + def _run(self): while not self.isCanceled(): - while self.isSuspended(): - if self.isCanceled(): - break - app.APP.monitor.waitForAbort(1) + if self.wait_while_suspended(): + return with app.APP.lock_playqueues: for playqueue in PLAYQUEUES: kodi_pl = js.playlist_get_items(playqueue.playlistid) @@ -212,4 +212,3 @@ class PlayqueueMonitor(backgroundthread.KillableThread): self._compare_playqueues(playqueue, kodi_pl) playqueue.old_kodi_pl = list(kodi_pl) app.APP.monitor.waitForAbort(0.2) - LOG.info("----===## PlayqueueMonitor stopped ##===----") diff --git a/resources/lib/plex_api.py b/resources/lib/plex_api.py index 29418253..aa90a012 100644 --- a/resources/lib/plex_api.py +++ b/resources/lib/plex_api.py @@ -1769,7 +1769,7 @@ class API(object): if force_check is False: # Validate the path is correct with user intervention if self.ask_to_validate(path): - app.SYNC.stop_sync = True + app.APP.stop_threads(block=False) path = None app.SYNC.path_verified = True else: diff --git a/resources/lib/plex_companion.py b/resources/lib/plex_companion.py index 01b7f259..4895a5a7 100644 --- a/resources/lib/plex_companion.py +++ b/resources/lib/plex_companion.py @@ -80,12 +80,6 @@ class PlexCompanion(backgroundthread.KillableThread): self.subscription_manager = None super(PlexCompanion, self).__init__() - def isSuspended(self): - """ - Returns True if the thread is suspended - """ - return self._suspended or app.APP.suspend - def _process_alexa(self, data): xml = PF.GetPlexMetadata(data['key']) try: @@ -245,6 +239,7 @@ class PlexCompanion(backgroundthread.KillableThread): """ Ensure that sockets will be closed no matter what """ + app.APP.register_thread(self) try: self._run() finally: @@ -257,7 +252,8 @@ class PlexCompanion(backgroundthread.KillableThread): self.httpd.socket.close() except AttributeError: pass - LOG.info("----===## Plex Companion stopped ##===----") + app.APP.deregister_thread(self) + LOG.info("----===## Plex Companion stopped ##===----") def _run(self): httpd = self.httpd @@ -303,10 +299,8 @@ class PlexCompanion(backgroundthread.KillableThread): # If we are not authorized, sleep # Otherwise, we trigger a download which leads to a # re-authorizations - while self.isSuspended(): - if self.isCanceled(): - break - app.APP.monitor.waitForAbort(1) + if self.wait_while_suspended(): + break try: message_count += 1 if httpd: diff --git a/resources/lib/plexbmchelper/listener.py b/resources/lib/plexbmchelper/listener.py index e08dca62..d50c2a20 100644 --- a/resources/lib/plexbmchelper/listener.py +++ b/resources/lib/plexbmchelper/listener.py @@ -134,7 +134,7 @@ class MyHandler(BaseHTTPRequestHandler): CLIENT_DICT[self.client_address[0]] = [] tracker = CLIENT_DICT[self.client_address[0]] tracker.append(self.client_address[1]) - while (not app.APP.player.isPlaying() and + while (not app.APP.is_playing and not app.APP.monitor.abortRequested() and sub_mgr.stop_sent_to_web and not (len(tracker) >= 4 and diff --git a/resources/lib/service_entry.py b/resources/lib/service_entry.py index 17ee3959..b9c7e440 100644 --- a/resources/lib/service_entry.py +++ b/resources/lib/service_entry.py @@ -7,7 +7,7 @@ import xbmc import xbmcgui from . import utils, clientinfo, timing -from . import initialsetup, artwork +from . import initialsetup from . import kodimonitor from . import sync from . import websocket_client @@ -27,9 +27,8 @@ LOG = logging.getLogger("PLEX.service") ############################################################################### WINDOW_PROPERTIES = ( - "plex_dbScan", "pms_token", "plex_token", "pms_server", - "plex_authenticated", "plex_restricteduser", "plex_allows_mediaDeletion", - "plexkodiconnect.command", "plex_result") + "pms_token", "plex_token", "plex_authenticated", "plex_restricteduser", + "plex_allows_mediaDeletion", "plexkodiconnect.command", "plex_result") # "Start from beginning", "Play from beginning" STRINGS = (utils.try_encode(utils.lang(12021)), @@ -126,9 +125,10 @@ class Service(): # Alert the user and suppress future warning if app.CONN.online: # PMS was online before - app.CONN.online = False - app.APP.suspend_threads = True LOG.warn("Plex Media Server went offline") + app.CONN.online = False + app.APP.suspend_threads() + LOG.debug('Threads suspended') if utils.settings('show_pms_offline') == 'true': utils.dialog('notification', utils.lang(33001), @@ -165,7 +165,7 @@ class Service(): if app.ACCOUNT.authenticated: # Server got offline when we were authenticated. # Hence resume threads - app.APP.suspend_threads = False + app.APP.resume_threads() app.CONN.online = True finally: self.connection_check_running = False @@ -175,22 +175,10 @@ class Service(): Ensures that lib sync threads are suspended; signs out user """ LOG.info('Log-out requested') - app.APP.suspend_threads = True - i = 0 - while app.SYNC.db_scan: - i += 1 - app.APP.monitor.waitForAbort(0.1) - if i > 150: - LOG.error('Could not stop library sync, aborting log-out') - # Failed to reset PMS and plex.tv connects. Try to restart Kodi - utils.messageDialog(utils.lang(29999), utils.lang(39208)) - # Resuming threads, just in case - app.APP.suspend_threads = False - return False - LOG.info('Successfully stopped library sync') + app.APP.suspend_threads() + LOG.info('Successfully suspended threads') app.ACCOUNT.log_out() LOG.info('User has been logged out') - return True def choose_pms_server(self, manual=False): LOG.info("Choosing PMS server requested, starting") @@ -202,15 +190,16 @@ class Service(): if not server: LOG.info('We did not connect to a new PMS, aborting') return False - LOG.info("User chose server %s", server['name']) - if server['machineIdentifier'] == app.CONN.machine_identifier: + LOG.info("User chose server %s with url %s", + server['name'], server['baseURL']) + if (server['machineIdentifier'] == app.CONN.machine_identifier and + server['baseURL'] == app.CONN.server): LOG.info('User chose old PMS to connect to') return False # Save changes to to file self.setup.save_pms_settings(server['baseURL'], server['token']) self.setup.write_pms_to_settings(server) - if not self.log_out(): - return False + self.log_out() # Wipe Kodi and Plex database as well as playlists and video nodes utils.wipe_database() app.CONN.load() @@ -220,12 +209,13 @@ class Service(): self.welcome_msg = False # Force a full sync app.SYNC.run_lib_scan = 'full' + # Enable the main loop to continue + app.APP.suspend = False LOG.info("Choosing new PMS complete") return True def switch_plex_user(self): - if not self.log_out(): - return False + self.log_out() # First remove playlists of old user utils.delete_playlists() # Remove video nodes @@ -234,6 +224,8 @@ class Service(): # Force full sync after login utils.settings('lastfullsync', value='0') app.SYNC.run_lib_scan = 'full' + # Enable the main loop to display user selection dialog + app.APP.suspend = False return True def toggle_plex_tv(self): @@ -246,6 +238,8 @@ class Service(): if self.setup.plex_tv_sign_in(): self.setup.write_credentials_to_settings() app.ACCOUNT.load() + # Enable the main loop to continue + app.APP.suspend = False def authenticate(self): """ @@ -265,22 +259,19 @@ class Service(): icon='{plex}', time=2000, sound=False) - app.APP.suspend_threads = False + app.APP.resume_threads() self.auth_running = False def enter_new_pms_address(self): server = self.setup.enter_new_pms_address() if not server: return - if not self.log_out(): - return False - # Save changes to to file + self.log_out() + # Save changes to to file self.setup.save_pms_settings(server['baseURL'], server['token']) self.setup.write_pms_to_settings(server) if not v.KODIVERSION >= 18: utils.settings('sslverify', value='false') - if not self.log_out(): - return False # Wipe Kodi and Plex database as well as playlists and video nodes utils.wipe_database() app.CONN.load() @@ -290,7 +281,9 @@ class Service(): self.welcome_msg = False # Force a full sync app.SYNC.run_lib_scan = 'full' - LOG.info("Choosing new PMS complete") + # Enable the main loop to continue + app.APP.suspend = False + LOG.info("Entering PMS address complete") return True def _do_auth(self): @@ -323,6 +316,8 @@ class Service(): if not user: LOG.info('No user received') app.APP.suspend = True + app.APP.suspend_threads() + LOG.debug('Threads suspended') return False username = user.title user_id = user.id @@ -355,7 +350,10 @@ class Service(): app.ACCOUNT.load() continue else: + LOG.debug('Suspending threads') app.APP.suspend = True + app.APP.suspend_threads() + LOG.debug('Threads suspended') return False elif res >= 400: LOG.error('Answer from PMS is not as expected') @@ -378,13 +376,6 @@ class Service(): app.init() app.APP.monitor = kodimonitor.KodiMonitor() app.APP.player = xbmc.Player() - artwork.IMAGE_CACHING_SUSPENDS = [ - app.APP.suspend_threads, - app.SYNC.stop_sync, - app.SYNC.db_scan - ] - if not utils.settings('imageSyncDuringPlayback') == 'true': - artwork.IMAGE_CACHING_SUSPENDS.append(app.SYNC.suspend_sync) # Initialize the PKC playqueues PQ.init_playqueues() @@ -505,7 +496,10 @@ class Service(): # EXITING PKC # Tell all threads to terminate (e.g. several lib sync threads) + LOG.debug('Aborting all threads') app.APP.stop_pkc = True + # Will block until threads have quit + app.APP.stop_threads() utils.window('plex_service_started', clear=True) LOG.info("======== STOP %s ========", v.ADDON_NAME) diff --git a/resources/lib/sync.py b/resources/lib/sync.py index dacca413..3b8214ba 100644 --- a/resources/lib/sync.py +++ b/resources/lib/sync.py @@ -15,19 +15,6 @@ if library_sync.PLAYLIST_SYNC_ENABLED: LOG = getLogger('PLEX.sync') -def set_library_scan_toggle(boolean=True): - """ - Make sure to hit this function before starting large scans - """ - if not boolean: - # Deactivate - app.SYNC.db_scan = False - utils.window('plex_dbScan', clear=True) - else: - app.SYNC.db_scan = True - utils.window('plex_dbScan', value="true") - - class Sync(backgroundthread.KillableThread): """ The one and only library sync thread. Spawn only 1! @@ -35,24 +22,18 @@ class Sync(backgroundthread.KillableThread): def __init__(self): self.sync_successful = False self.last_full_sync = 0 - self.fanart = None - # Show sync dialog even if user deactivated? - self.force_dialog = False + self.fanart_thread = None self.image_cache_thread = None # Lock used to wait on a full sync, e.g. on initial sync # self.lock = backgroundthread.threading.Lock() super(Sync, self).__init__() - def isSuspended(self): - return self._suspended or app.APP.suspend_threads - def triage_lib_scans(self): """ Decides what to do if app.SYNC.run_lib_scan has been set. E.g. manually triggered full or repair syncs """ if app.SYNC.run_lib_scan in ("full", "repair"): - set_library_scan_toggle() LOG.info('Full library scan requested, starting') self.start_library_sync(show_dialog=True, repair=app.SYNC.run_lib_scan == 'repair', @@ -89,23 +70,16 @@ class Sync(backgroundthread.KillableThread): """ self.sync_successful = successful self.last_full_sync = timing.unix_timestamp() - set_library_scan_toggle(boolean=False) if not successful: LOG.warn('Could not finish scheduled full sync') - # try: - # self.lock.release() - # except backgroundthread.threading.ThreadError: - # pass + app.APP.resume_fanart_thread() + app.APP.resume_caching_thread() def start_library_sync(self, show_dialog=None, repair=False, block=False): - set_library_scan_toggle(boolean=True) + app.APP.suspend_fanart_thread(block=True) + app.APP.suspend_caching_thread(block=True) show_dialog = show_dialog if show_dialog is not None else app.SYNC.sync_dialog library_sync.start(show_dialog, repair, self.on_library_scan_finished) - # if block: - # self.lock.acquire() - # Will block until scan is finished - # self.lock.acquire() - # self.lock.release() def start_fanart_download(self, refresh): if not utils.settings('FanartTV') == 'true': @@ -114,11 +88,11 @@ class Sync(backgroundthread.KillableThread): if not app.SYNC.artwork: LOG.info('Not synching Plex PMS artwork, not getting artwork') return False - elif self.fanart is None or not self.fanart.is_alive(): + elif self.fanart_thread is None or not self.fanart_thread.is_alive(): LOG.info('Start downloading additional fanart with refresh %s', refresh) - self.fanart = library_sync.FanartThread(self.on_fanart_download_finished, refresh) - self.fanart.start() + self.fanart_thread = library_sync.FanartThread(self.on_fanart_download_finished, refresh) + self.fanart_thread.start() return True else: LOG.info('Still downloading fanart') @@ -144,16 +118,18 @@ class Sync(backgroundthread.KillableThread): self.image_cache_thread.start() def run(self): + LOG.info("---===### Starting Sync Thread ###===---") + app.APP.register_thread(self) try: self._run_internal() except Exception: - app.SYNC.db_scan = False - utils.window('plex_dbScan', clear=True) utils.ERROR(txt='sync.py crashed', notify=True) raise + finally: + app.APP.deregister_thread(self) + LOG.info("###===--- Sync Thread Stopped ---===###") def _run_internal(self): - LOG.info("---===### Starting Sync Thread ###===---") install_sync_done = utils.settings('SyncInstallRunDone') == 'true' playlist_monitor = None initial_sync_done = False @@ -186,17 +162,10 @@ class Sync(backgroundthread.KillableThread): while not self.isCanceled(): # In the event the server goes offline - while self.isSuspended(): - if self.isCanceled(): - # Abort was requested while waiting. We should exit - LOG.info("###===--- Sync Thread Stopped ---===###") - return - app.APP.monitor.waitForAbort(1) - + if self.wait_while_suspended(): + return if not install_sync_done: # Very FIRST sync ever upon installation or reset of Kodi DB - set_library_scan_toggle() - self.force_dialog = True # Initialize time offset Kodi - PMS library_sync.sync_pms_time() last_time_sync = timing.unix_timestamp() @@ -217,7 +186,6 @@ class Sync(backgroundthread.KillableThread): else: LOG.error('Initial start-up full sync unsuccessful') app.APP.monitor.waitForAbort(1) - self.force_dialog = False xbmc.executebuiltin('InhibitIdleShutdown(false)') elif not initial_sync_done: @@ -237,13 +205,10 @@ class Sync(backgroundthread.KillableThread): app.APP.monitor.waitForAbort(1) # Currently no db scan, so we could start a new scan - elif app.SYNC.db_scan is False: + else: # Full scan was requested from somewhere else if app.SYNC.run_lib_scan is not None: - # Force-show dialogs since they are user-initiated - self.force_dialog = True self.triage_lib_scans() - self.force_dialog = False # Reset the flag app.SYNC.run_lib_scan = None continue @@ -251,7 +216,7 @@ class Sync(backgroundthread.KillableThread): # Standard syncs - don't force-show dialogs now = timing.unix_timestamp() if (now - self.last_full_sync > app.SYNC.full_sync_intervall and - not app.SYNC.suspend_sync): + not app.APP.is_playing_video): LOG.info('Doing scheduled full library scan') self.start_library_sync() elif now - last_time_sync > one_day_in_seconds: @@ -287,4 +252,3 @@ class Sync(backgroundthread.KillableThread): DU().stopSession() except AttributeError: pass - LOG.info("###===--- Sync Thread Stopped ---===###") diff --git a/resources/lib/utils.py b/resources/lib/utils.py index df5ba600..a0571c1a 100644 --- a/resources/lib/utils.py +++ b/resources/lib/utils.py @@ -223,7 +223,7 @@ def ERROR(txt='', hide_tb=False, notify=False, cancel_sync=False): LOG.error('Error encountered: %s - %s', txt, short) if cancel_sync: from . import app - app.SYNC.stop_sync = True + app.APP.stop_threads(block=False) if hide_tb: return short @@ -502,19 +502,7 @@ def reset(ask_user=True): return from . import app # first stop any db sync - app.APP.suspend_threads = True - count = 15 - while app.SYNC.db_scan: - LOG.info("Sync is running, will retry: %s...", count) - count -= 1 - if count == 0: - LOG.error('Could not stop PKC syncing process to reset the DB') - # Could not stop the database from running. Please try again later. - messageDialog(lang(29999), lang(39601)) - app.APP.suspend_threads = False - return - xbmc.sleep(1000) - + app.APP.suspend_threads() # Reset all PlexKodiConnect Addon settings? (this is usually NOT # recommended and unnecessary!) if ask_user and yesno_dialog(lang(29999), lang(39603)): diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index 8ed95d16..82c80881 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -47,20 +47,20 @@ class WebSocket(backgroundthread.KillableThread): def run(self): LOG.info("----===## Starting %s ##===----", self.__class__.__name__) + app.APP.register_thread(self) counter = 0 while not self.isCanceled(): # In the event the server goes offline - while self.isSuspended(): + if self.isSuspended(): # Set in service.py if self.ws is not None: self.ws.close() self.ws = None - if self.isCanceled(): + if self.wait_while_suspended(): # Abort was requested while waiting. We should exit LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__) return - app.APP.monitor.waitForAbort(1) try: self.process(*self.receive(self.ws)) except websocket.WebSocketTimeoutException: @@ -136,6 +136,7 @@ class WebSocket(backgroundthread.KillableThread): # Close websocket connection on shutdown if self.ws is not None: self.ws.close() + app.APP.deregister_thread(self) LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__) @@ -147,9 +148,7 @@ class PMS_Websocket(WebSocket): """ Returns True if the thread is suspended """ - return (self._suspended or - app.APP.suspend_threads or - app.SYNC.background_sync_disabled) + return self._suspended or app.SYNC.background_sync_disabled def getUri(self): if self.redirect_uri: @@ -201,11 +200,6 @@ class PMS_Websocket(WebSocket): # Drop everything we're not interested in if typus not in ('playing', 'timeline', 'activity'): return - elif typus == 'activity' and app.SYNC.db_scan is True: - # Only add to processing if PKC is NOT doing a lib scan (and thus - # possibly causing these reprocessing messages en mass) - LOG.debug('%s: Dropping message as PKC is currently synching', - self.__class__.__name__) else: # Put PMS message on queue and let libsync take care of it app.APP.websocket_queue.put(message)