From aa849f74575a0e0ef237c09f832f17b1042b719f Mon Sep 17 00:00:00 2001 From: tomkat83 Date: Tue, 26 Jan 2016 15:13:03 +0100 Subject: [PATCH] Added decorators for threads --- resources/lib/PlexCompanion.py | 15 ++-- resources/lib/entrypoint.py | 3 +- resources/lib/librarysync.py | 135 ++++++++++----------------------- resources/lib/userclient.py | 24 ++---- resources/lib/utils.py | 66 +++++++++++++--- service.py | 4 +- 6 files changed, 113 insertions(+), 134 deletions(-) diff --git a/resources/lib/PlexCompanion.py b/resources/lib/PlexCompanion.py index 346de3bf..307d348a 100644 --- a/resources/lib/PlexCompanion.py +++ b/resources/lib/PlexCompanion.py @@ -12,9 +12,9 @@ from plexbmchelper import listener, plexgdm, subscribers from plexbmchelper.settings import settings +@utils.ThreadMethods class PlexCompanion(threading.Thread): def __init__(self): - self._shouldStop = threading.Event() self.port = int(utils.settings('companionPort')) ci = clientinfo.ClientInfo() self.addonName = ci.getAddonName() @@ -38,13 +38,6 @@ class PlexCompanion(threading.Thread): className = self.__class__.__name__ utils.logMsg("%s %s" % (self.addonName, className), msg, lvl) - def stopClient(self): - # When emby for kodi terminates - self._shouldStop.set() - - def stopped(self): - return self._shouldStop.isSet() - def run(self): start_count = 0 while True: @@ -73,7 +66,11 @@ class PlexCompanion(threading.Thread): self.client.start_all() message_count = 0 is_running = False - while not self.stopped(): + while not self.threadStopped(): + while self.threadSuspended(): + if self.threadStopped(): + break + xbmc.sleep(3000) try: httpd.handle_request() diff --git a/resources/lib/entrypoint.py b/resources/lib/entrypoint.py index 06669e27..004a5df4 100644 --- a/resources/lib/entrypoint.py +++ b/resources/lib/entrypoint.py @@ -275,8 +275,7 @@ def switchPlexUser(): # utils.window('EmbyAdditionalUserImage.%s' % position, clear=True) utils.logMsg("PLEX", "Plex home user switch requested", 0) # Pause library sync thread - user needs to be auth in order to sync - lib = librarysync.LibrarySync() - lib.suspendThread() + utils.window('suspend_LibraryThread', value='true') # Log out currently signed in user: utils.window('emby_serverStatus', value="401") # Request lib sync to get user view data (e.g. watched/unwatched) diff --git a/resources/lib/librarysync.py b/resources/lib/librarysync.py index 18932b89..a0ff5781 100644 --- a/resources/lib/librarysync.py +++ b/resources/lib/librarysync.py @@ -26,6 +26,8 @@ import PlexAPI ################################################################################################## +@utils.ThreadMethodsStopsync +@utils.ThreadMethods class ThreadedGetMetadata(threading.Thread): """ Threaded download of Plex XML metadata for a certain library item. @@ -37,21 +39,17 @@ class ThreadedGetMetadata(threading.Thread): out_queue Queue.Queue() object where this thread will store the downloaded metadata XMLs as etree objects lock threading.Lock(), used for counting where we are - userStop Handle to a function where True is used to stop - this Thread """ - def __init__(self, queue, out_queue, lock, userStop): + def __init__(self, queue, out_queue, lock): self.queue = queue self.out_queue = out_queue self.lock = lock - self.userStop = userStop - self._shouldstop = threading.Event() threading.Thread.__init__(self) def run(self): plx = PlexAPI.PlexAPI() global getMetadataCount - while self.stopped() is False: + while self.threadStopped() is False: # grabs Plex item from queue try: updateItem = self.queue.get(block=False) @@ -76,13 +74,9 @@ class ThreadedGetMetadata(threading.Thread): # signals to queue job is done self.queue.task_done() - def stopThread(self): - self._shouldstop.set() - - def stopped(self): - return self._shouldstop.isSet() or self.userStop() - +@utils.ThreadMethodsStopsync +@utils.ThreadMethods class ThreadedProcessMetadata(threading.Thread): """ Not yet implemented - if ever. Only to be called by ONE thread! @@ -94,14 +88,11 @@ class ThreadedProcessMetadata(threading.Thread): itemType: as used to call functions in itemtypes.py e.g. 'Movies' => itemtypes.Movies() lock: threading.Lock(), used for counting where we are - userStop Handle to a function where True is used to stop this Thread """ - def __init__(self, queue, itemType, lock, userStop): + def __init__(self, queue, itemType, lock): self.queue = queue self.lock = lock self.itemType = itemType - self.userStop = userStop - self._shouldstop = threading.Event() threading.Thread.__init__(self) def run(self): @@ -110,7 +101,7 @@ class ThreadedProcessMetadata(threading.Thread): global processMetadataCount global processingViewName with itemFkt() as item: - while self.stopped() is False: + while self.threadStopped() is False: # grabs item from queue try: updateItem = self.queue.get(block=False) @@ -134,13 +125,9 @@ class ThreadedProcessMetadata(threading.Thread): # signals to queue job is done self.queue.task_done() - def stopThread(self): - self._shouldstop.set() - - def stopped(self): - return self._shouldstop.isSet() or self.userStop() - +@utils.ThreadMethodsStopsync +@utils.ThreadMethods class ThreadedShowSyncInfo(threading.Thread): """ Threaded class to show the Kodi statusbar of the metadata download. @@ -149,14 +136,11 @@ class ThreadedShowSyncInfo(threading.Thread): dialog xbmcgui.DialogProgressBG() object to show progress locks = [downloadLock, processLock] Locks() to the other threads total: Total number of items to get - userStop: function handle to stop thread """ - def __init__(self, dialog, locks, total, itemType, userStop): + def __init__(self, dialog, locks, total, itemType): self.locks = locks self.total = total - self.userStop = userStop self.addonName = clientinfo.ClientInfo().getAddonName() - self._shouldstop = threading.Event() self.dialog = dialog self.itemType = itemType threading.Thread.__init__(self) @@ -173,7 +157,7 @@ class ThreadedShowSyncInfo(threading.Thread): global processingViewName total = 2 * total totalProgress = 0 - while self.stopped() is False: + while self.threadStopped() is False: with downloadLock: getMetadataProgress = getMetadataCount with processLock: @@ -192,20 +176,13 @@ class ThreadedShowSyncInfo(threading.Thread): xbmc.sleep(500) self.dialog.close() - def stopThread(self): - self._shouldstop.set() - - def stopped(self): - return self._shouldstop.isSet() or self.userStop() - +@utils.ThreadMethodsStopsync +@utils.ThreadMethods class LibrarySync(threading.Thread): _shared_state = {} - stop_thread = False - suspend_thread = False - # Track websocketclient updates addedItems = [] updateItems = [] @@ -214,11 +191,9 @@ class LibrarySync(threading.Thread): forceLibraryUpdate = False refresh_views = False - def __init__(self): self.__dict__ = self._shared_state - self.monitor = xbmc.Monitor() self.clientInfo = clientinfo.ClientInfo() self.addonName = self.clientInfo.getAddonName() @@ -308,15 +283,6 @@ class LibrarySync(threading.Thread): % (overlap, lastSync), 1) utils.settings('LastIncrementalSync', value=lastSync) - def shouldStop(self): - # Checkpoint during the syncing process - if self.monitor.abortRequested(): - return True - elif utils.window('emby_shouldStop') == "true": - return True - else: # Keep going - return False - def initializeDBs(self): """ Run once during startup to verify that emby db exists. @@ -587,7 +553,7 @@ class LibrarySync(threading.Thread): # Skipping XML item 'title=All episodes' without a 'ratingKey' if not item.get('ratingKey', False): continue - if self.shouldStop(): + if self.threadStopped(): return False API = PlexAPI.API(item) plex_checksum = API.getChecksum() @@ -610,7 +576,7 @@ class LibrarySync(threading.Thread): # Only look at valid items = Plex library items if not item.get('ratingKey', False): continue - if self.shouldStop(): + if self.threadStopped(): return False API = PlexAPI.API(item) itemId = API.getKey() @@ -663,8 +629,7 @@ class LibrarySync(threading.Thread): for i in range(self.syncThreadNumber): thread = ThreadedGetMetadata(getMetadataQueue, processMetadataQueue, - getMetadataLock, - self.shouldStop) + getMetadataLock) thread.setDaemon(True) thread.start() threads.append(thread) @@ -672,8 +637,7 @@ class LibrarySync(threading.Thread): # Spawn one more thread to process Metadata, once downloaded thread = ThreadedProcessMetadata(processMetadataQueue, itemType, - processMetadataLock, - self.shouldStop) + processMetadataLock) thread.setDaemon(True) thread.start() threads.append(thread) @@ -682,13 +646,10 @@ class LibrarySync(threading.Thread): # Start one thread to show sync progress dialog = xbmcgui.DialogProgressBG() total = len(self.updatelist) - thread = ThreadedShowSyncInfo( - dialog, - [getMetadataLock, processMetadataLock], - total, - itemType, - self.shouldStop - ) + thread = ThreadedShowSyncInfo(dialog, + [getMetadataLock, processMetadataLock], + total, + itemType) thread.setDaemon(True) thread.start() threads.append(thread) @@ -703,9 +664,10 @@ class LibrarySync(threading.Thread): self.logMsg("Stop sent to all threads", 1) # Wait till threads are indeed dead for thread in threads: - thread.join(15.0) + thread.join(5.0) if thread.isAlive(): self.logMsg("Could not terminate thread", -1) + # Make sure dialog window is closed if dialog: dialog.close() self.logMsg("=====================", 1) @@ -743,7 +705,7 @@ class LibrarySync(threading.Thread): ##### PROCESS MOVIES ##### self.updatelist = [] for view in views: - if self.shouldStop(): + if self.threadStopped(): return False # Get items per view viewId = view['id'] @@ -802,7 +764,7 @@ class LibrarySync(threading.Thread): for view in views: - if self.shouldStop(): + if self.threadStopped(): return False # Get items per view @@ -825,7 +787,7 @@ class LibrarySync(threading.Thread): count = 0 for embymvideo in embymvideos: # Process individual musicvideo - if self.shouldStop(): + if self.threadStopped(): return False title = embymvideo['Name'] @@ -879,7 +841,7 @@ class LibrarySync(threading.Thread): ##### PROCESS TV Shows ##### self.updatelist = [] for view in views: - if self.shouldStop(): + if self.threadStopped(): return False # Get items per view viewId = view['id'] @@ -899,7 +861,7 @@ class LibrarySync(threading.Thread): ##### PROCESS TV Seasons ##### # Cycle through tv shows for tvShowId in allPlexTvShowsId: - if self.shouldStop(): + if self.threadStopped(): return False # Grab all seasons to tvshow from PMS seasons = plx.GetAllPlexChildren(tvShowId) @@ -915,7 +877,7 @@ class LibrarySync(threading.Thread): ##### PROCESS TV Episodes ##### # Cycle through tv shows for view in views: - if self.shouldStop(): + if self.threadStopped(): return False # Grab all episodes to tvshow from PMS episodes = plx.GetAllPlexLeaves(view['id']) @@ -982,7 +944,7 @@ class LibrarySync(threading.Thread): count = 0 for embyitem in embyitems: # Process individual item - if self.shouldStop(): + if self.threadStopped(): return False title = embyitem['Name'] @@ -1128,18 +1090,17 @@ class LibrarySync(threading.Thread): def run_internal(self): startupComplete = False - monitor = self.monitor self.logMsg("---===### Starting LibrarySync ###===---", 0) - - while not monitor.abortRequested(): + while not self.threadStopped(): # In the event the server goes offline - while self.suspend_thread: + while self.threadSuspended(): # Set in service.py - if monitor.waitForAbort(5): + if self.threadStopped(): # Abort was requested while waiting. We should exit break + xbmc.sleep(3000) if (utils.window('emby_dbCheck') != "true" and utils.settings('SyncInstallRunDone') == "true"): @@ -1217,29 +1178,13 @@ class LibrarySync(threading.Thread): librarySync = self.startSync() self.logMsg("SyncDatabase onWake (finished) %s" % librarySync, 0) - if self.stop_thread: + if self.threadStopped(): # Set in service.py self.logMsg("Service terminated thread.", 2) break - if monitor.waitForAbort(1): - # Abort was requested while waiting. We should exit - break - self.logMsg("###===--- LibrarySync Stopped ---===###", 0) - def stopThread(self): - self.stop_thread = True - self.logMsg("Ending thread...", 2) - - def suspendThread(self): - self.suspend_thread = True - self.logMsg("Pausing thread...", 0) - - def resumeThread(self): - self.suspend_thread = False - self.logMsg("Resuming thread...", 0) - class ManualSync(LibrarySync): @@ -1278,7 +1223,7 @@ class ManualSync(LibrarySync): ##### PROCESS MOVIES ##### for view in views: - if self.shouldStop(): + if self.threadStopped(): return False # Get items per view @@ -1293,7 +1238,7 @@ class ManualSync(LibrarySync): all_embymovies = emby.getMovies(viewId, basic=True, dialog=pdialog) for embymovie in all_embymovies['Items']: - if self.shouldStop(): + if self.threadStopped(): return False API = api.API(embymovie) @@ -1316,7 +1261,7 @@ class ManualSync(LibrarySync): count = 0 for embymovie in embymovies: # Process individual movies - if self.shouldStop(): + if self.threadStopped(): return False title = embymovie['Name'] @@ -1338,7 +1283,7 @@ class ManualSync(LibrarySync): for boxset in boxsets['Items']: - if self.shouldStop(): + if self.threadStopped(): return False # Boxset has no real userdata, so using etag to compare diff --git a/resources/lib/userclient.py b/resources/lib/userclient.py index f0bdb5d5..1bfc151c 100644 --- a/resources/lib/userclient.py +++ b/resources/lib/userclient.py @@ -19,12 +19,12 @@ import librarysync ################################################################################################## +@utils.ThreadMethods class UserClient(threading.Thread): # Borg - multiple instances, shared state __shared_state = {} - stopClient = False auth = True retry = 0 @@ -366,10 +366,13 @@ class UserClient(threading.Thread): def run(self): - monitor = xbmc.Monitor() self.logMsg("----===## Starting UserClient ##===----", 0) - while not monitor.abortRequested(): + while not self.threadStopped(): + while self.threadSuspended(): + if self.threadStopped(): + break + xbmc.sleep(3000) status = utils.window('emby_serverStatus') if status: @@ -406,18 +409,5 @@ class UserClient(threading.Thread): self.logMsg("Username found: %s" % username, 2) self.auth = True - - if self.stopClient == True: - # If stopping the client didn't work - break - - if monitor.waitForAbort(1): - # Abort was requested while waiting. We should exit - break - - self.doUtils.stopSession() + self.doUtils.stopSession() self.logMsg("##===---- UserClient Stopped ----===##", 0) - - def stopClient(self): - # When emby for kodi terminates - self.stopClient = True \ No newline at end of file diff --git a/resources/lib/utils.py b/resources/lib/utils.py index 8e0b9c81..75c0246d 100644 --- a/resources/lib/utils.py +++ b/resources/lib/utils.py @@ -20,17 +20,65 @@ import clientinfo ################################################################################################# -def borg(cls): +def ThreadMethodsStopsync(cls): """ - Dekorator to turn a class into a borg class with an added "@utils.borg" - """ - cls._state = {} - orig_init = cls.__init__ + Decorator to replace stopThread method to include the Kodi window property + 'emby_shouldStop' - def new_init(self, *args, **kwargs): - self.__dict__ = cls._state - orig_init(self, *args, **kwargs) - cls.__init__ = new_init + Use with any library sync threads. @ThreadMethods still required FIRST + """ + def threadStopped(self): + return (self._threadStopped or + self._abortMonitor.abortRequested() or + window('emby_shouldStop') == "true") + cls.threadStopped = threadStopped + return cls + + +def ThreadMethods(cls): + """ + Decorator to add the following methods to a threading class: + + suspendThread(): pauses the thread + resumeThread(): resumes the thread + stopThread(): stopps the thread + + threadSuspended(): returns True if thread is suspend_thread + threadStopped(): returns True if thread is stopped (or should stop ;-)) + ALSO stops if Kodi is exited + + Also adds the following class attributes: + _threadStopped + _threadSuspended + _abortMonitor = xbmc.Monitor() (to check for premature Kodi exit) + """ + # Attach new attributes to class + cls._threadStopped = False + cls._threadSuspended = False + cls._abortMonitor = xbmc.Monitor() + + # Define new class methods and attach them to class + def stopThread(self): + self._threadStopped = True + cls.stopThread = stopThread + + def suspendThread(self): + self._threadSuspended = True + cls.suspendThread = suspendThread + + def resumeThread(self): + self._threadSuspended = False + cls.resumeThread = resumeThread + + def threadSuspended(self): + return self._threadSuspended + cls.threadSuspended = threadSuspended + + def threadStopped(self): + return self._threadStopped or self._abortMonitor.abortRequested() + cls.threadStopped = threadStopped + + # Return class to render this a decorator return cls diff --git a/service.py b/service.py index 05d15573..3f1f33d5 100644 --- a/service.py +++ b/service.py @@ -293,7 +293,7 @@ class Service(): ##### Emby thread is terminating. ##### if self.plexCompanion_running: - plexCompanion.stopClient() + plexCompanion.stopThread() if self.library_running: library.stopThread() @@ -302,7 +302,7 @@ class Service(): # ws.stopClient() if self.userclient_running: - user.stopClient() + user.stopThread() self.logMsg("======== STOP %s ========" % self.addonName, 0)