Added decorators for threads

This commit is contained in:
tomkat83 2016-01-26 15:13:03 +01:00
parent 079f43c644
commit aa849f7457
6 changed files with 113 additions and 134 deletions

View file

@ -12,9 +12,9 @@ from plexbmchelper import listener, plexgdm, subscribers
from plexbmchelper.settings import settings from plexbmchelper.settings import settings
@utils.ThreadMethods
class PlexCompanion(threading.Thread): class PlexCompanion(threading.Thread):
def __init__(self): def __init__(self):
self._shouldStop = threading.Event()
self.port = int(utils.settings('companionPort')) self.port = int(utils.settings('companionPort'))
ci = clientinfo.ClientInfo() ci = clientinfo.ClientInfo()
self.addonName = ci.getAddonName() self.addonName = ci.getAddonName()
@ -38,13 +38,6 @@ class PlexCompanion(threading.Thread):
className = self.__class__.__name__ className = self.__class__.__name__
utils.logMsg("%s %s" % (self.addonName, className), msg, lvl) utils.logMsg("%s %s" % (self.addonName, className), msg, lvl)
def stopClient(self):
# When emby for kodi terminates
self._shouldStop.set()
def stopped(self):
return self._shouldStop.isSet()
def run(self): def run(self):
start_count = 0 start_count = 0
while True: while True:
@ -73,7 +66,11 @@ class PlexCompanion(threading.Thread):
self.client.start_all() self.client.start_all()
message_count = 0 message_count = 0
is_running = False is_running = False
while not self.stopped(): while not self.threadStopped():
while self.threadSuspended():
if self.threadStopped():
break
xbmc.sleep(3000)
try: try:
httpd.handle_request() httpd.handle_request()

View file

@ -275,8 +275,7 @@ def switchPlexUser():
# utils.window('EmbyAdditionalUserImage.%s' % position, clear=True) # utils.window('EmbyAdditionalUserImage.%s' % position, clear=True)
utils.logMsg("PLEX", "Plex home user switch requested", 0) utils.logMsg("PLEX", "Plex home user switch requested", 0)
# Pause library sync thread - user needs to be auth in order to sync # Pause library sync thread - user needs to be auth in order to sync
lib = librarysync.LibrarySync() utils.window('suspend_LibraryThread', value='true')
lib.suspendThread()
# Log out currently signed in user: # Log out currently signed in user:
utils.window('emby_serverStatus', value="401") utils.window('emby_serverStatus', value="401")
# Request lib sync to get user view data (e.g. watched/unwatched) # Request lib sync to get user view data (e.g. watched/unwatched)

View file

@ -26,6 +26,8 @@ import PlexAPI
################################################################################################## ##################################################################################################
@utils.ThreadMethodsStopsync
@utils.ThreadMethods
class ThreadedGetMetadata(threading.Thread): class ThreadedGetMetadata(threading.Thread):
""" """
Threaded download of Plex XML metadata for a certain library item. Threaded download of Plex XML metadata for a certain library item.
@ -37,21 +39,17 @@ class ThreadedGetMetadata(threading.Thread):
out_queue Queue.Queue() object where this thread will store out_queue Queue.Queue() object where this thread will store
the downloaded metadata XMLs as etree objects the downloaded metadata XMLs as etree objects
lock threading.Lock(), used for counting where we are lock threading.Lock(), used for counting where we are
userStop Handle to a function where True is used to stop
this Thread
""" """
def __init__(self, queue, out_queue, lock, userStop): def __init__(self, queue, out_queue, lock):
self.queue = queue self.queue = queue
self.out_queue = out_queue self.out_queue = out_queue
self.lock = lock self.lock = lock
self.userStop = userStop
self._shouldstop = threading.Event()
threading.Thread.__init__(self) threading.Thread.__init__(self)
def run(self): def run(self):
plx = PlexAPI.PlexAPI() plx = PlexAPI.PlexAPI()
global getMetadataCount global getMetadataCount
while self.stopped() is False: while self.threadStopped() is False:
# grabs Plex item from queue # grabs Plex item from queue
try: try:
updateItem = self.queue.get(block=False) updateItem = self.queue.get(block=False)
@ -76,13 +74,9 @@ class ThreadedGetMetadata(threading.Thread):
# signals to queue job is done # signals to queue job is done
self.queue.task_done() self.queue.task_done()
def stopThread(self):
self._shouldstop.set()
def stopped(self):
return self._shouldstop.isSet() or self.userStop()
@utils.ThreadMethodsStopsync
@utils.ThreadMethods
class ThreadedProcessMetadata(threading.Thread): class ThreadedProcessMetadata(threading.Thread):
""" """
Not yet implemented - if ever. Only to be called by ONE thread! Not yet implemented - if ever. Only to be called by ONE thread!
@ -94,14 +88,11 @@ class ThreadedProcessMetadata(threading.Thread):
itemType: as used to call functions in itemtypes.py itemType: as used to call functions in itemtypes.py
e.g. 'Movies' => itemtypes.Movies() e.g. 'Movies' => itemtypes.Movies()
lock: threading.Lock(), used for counting where we are lock: threading.Lock(), used for counting where we are
userStop Handle to a function where True is used to stop this Thread
""" """
def __init__(self, queue, itemType, lock, userStop): def __init__(self, queue, itemType, lock):
self.queue = queue self.queue = queue
self.lock = lock self.lock = lock
self.itemType = itemType self.itemType = itemType
self.userStop = userStop
self._shouldstop = threading.Event()
threading.Thread.__init__(self) threading.Thread.__init__(self)
def run(self): def run(self):
@ -110,7 +101,7 @@ class ThreadedProcessMetadata(threading.Thread):
global processMetadataCount global processMetadataCount
global processingViewName global processingViewName
with itemFkt() as item: with itemFkt() as item:
while self.stopped() is False: while self.threadStopped() is False:
# grabs item from queue # grabs item from queue
try: try:
updateItem = self.queue.get(block=False) updateItem = self.queue.get(block=False)
@ -134,13 +125,9 @@ class ThreadedProcessMetadata(threading.Thread):
# signals to queue job is done # signals to queue job is done
self.queue.task_done() self.queue.task_done()
def stopThread(self):
self._shouldstop.set()
def stopped(self):
return self._shouldstop.isSet() or self.userStop()
@utils.ThreadMethodsStopsync
@utils.ThreadMethods
class ThreadedShowSyncInfo(threading.Thread): class ThreadedShowSyncInfo(threading.Thread):
""" """
Threaded class to show the Kodi statusbar of the metadata download. Threaded class to show the Kodi statusbar of the metadata download.
@ -149,14 +136,11 @@ class ThreadedShowSyncInfo(threading.Thread):
dialog xbmcgui.DialogProgressBG() object to show progress dialog xbmcgui.DialogProgressBG() object to show progress
locks = [downloadLock, processLock] Locks() to the other threads locks = [downloadLock, processLock] Locks() to the other threads
total: Total number of items to get total: Total number of items to get
userStop: function handle to stop thread
""" """
def __init__(self, dialog, locks, total, itemType, userStop): def __init__(self, dialog, locks, total, itemType):
self.locks = locks self.locks = locks
self.total = total self.total = total
self.userStop = userStop
self.addonName = clientinfo.ClientInfo().getAddonName() self.addonName = clientinfo.ClientInfo().getAddonName()
self._shouldstop = threading.Event()
self.dialog = dialog self.dialog = dialog
self.itemType = itemType self.itemType = itemType
threading.Thread.__init__(self) threading.Thread.__init__(self)
@ -173,7 +157,7 @@ class ThreadedShowSyncInfo(threading.Thread):
global processingViewName global processingViewName
total = 2 * total total = 2 * total
totalProgress = 0 totalProgress = 0
while self.stopped() is False: while self.threadStopped() is False:
with downloadLock: with downloadLock:
getMetadataProgress = getMetadataCount getMetadataProgress = getMetadataCount
with processLock: with processLock:
@ -192,20 +176,13 @@ class ThreadedShowSyncInfo(threading.Thread):
xbmc.sleep(500) xbmc.sleep(500)
self.dialog.close() self.dialog.close()
def stopThread(self):
self._shouldstop.set()
def stopped(self):
return self._shouldstop.isSet() or self.userStop()
@utils.ThreadMethodsStopsync
@utils.ThreadMethods
class LibrarySync(threading.Thread): class LibrarySync(threading.Thread):
_shared_state = {} _shared_state = {}
stop_thread = False
suspend_thread = False
# Track websocketclient updates # Track websocketclient updates
addedItems = [] addedItems = []
updateItems = [] updateItems = []
@ -214,11 +191,9 @@ class LibrarySync(threading.Thread):
forceLibraryUpdate = False forceLibraryUpdate = False
refresh_views = False refresh_views = False
def __init__(self): def __init__(self):
self.__dict__ = self._shared_state self.__dict__ = self._shared_state
self.monitor = xbmc.Monitor()
self.clientInfo = clientinfo.ClientInfo() self.clientInfo = clientinfo.ClientInfo()
self.addonName = self.clientInfo.getAddonName() self.addonName = self.clientInfo.getAddonName()
@ -308,15 +283,6 @@ class LibrarySync(threading.Thread):
% (overlap, lastSync), 1) % (overlap, lastSync), 1)
utils.settings('LastIncrementalSync', value=lastSync) utils.settings('LastIncrementalSync', value=lastSync)
def shouldStop(self):
# Checkpoint during the syncing process
if self.monitor.abortRequested():
return True
elif utils.window('emby_shouldStop') == "true":
return True
else: # Keep going
return False
def initializeDBs(self): def initializeDBs(self):
""" """
Run once during startup to verify that emby db exists. Run once during startup to verify that emby db exists.
@ -587,7 +553,7 @@ class LibrarySync(threading.Thread):
# Skipping XML item 'title=All episodes' without a 'ratingKey' # Skipping XML item 'title=All episodes' without a 'ratingKey'
if not item.get('ratingKey', False): if not item.get('ratingKey', False):
continue continue
if self.shouldStop(): if self.threadStopped():
return False return False
API = PlexAPI.API(item) API = PlexAPI.API(item)
plex_checksum = API.getChecksum() plex_checksum = API.getChecksum()
@ -610,7 +576,7 @@ class LibrarySync(threading.Thread):
# Only look at valid items = Plex library items # Only look at valid items = Plex library items
if not item.get('ratingKey', False): if not item.get('ratingKey', False):
continue continue
if self.shouldStop(): if self.threadStopped():
return False return False
API = PlexAPI.API(item) API = PlexAPI.API(item)
itemId = API.getKey() itemId = API.getKey()
@ -663,8 +629,7 @@ class LibrarySync(threading.Thread):
for i in range(self.syncThreadNumber): for i in range(self.syncThreadNumber):
thread = ThreadedGetMetadata(getMetadataQueue, thread = ThreadedGetMetadata(getMetadataQueue,
processMetadataQueue, processMetadataQueue,
getMetadataLock, getMetadataLock)
self.shouldStop)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
@ -672,8 +637,7 @@ class LibrarySync(threading.Thread):
# Spawn one more thread to process Metadata, once downloaded # Spawn one more thread to process Metadata, once downloaded
thread = ThreadedProcessMetadata(processMetadataQueue, thread = ThreadedProcessMetadata(processMetadataQueue,
itemType, itemType,
processMetadataLock, processMetadataLock)
self.shouldStop)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
@ -682,13 +646,10 @@ class LibrarySync(threading.Thread):
# Start one thread to show sync progress # Start one thread to show sync progress
dialog = xbmcgui.DialogProgressBG() dialog = xbmcgui.DialogProgressBG()
total = len(self.updatelist) total = len(self.updatelist)
thread = ThreadedShowSyncInfo( thread = ThreadedShowSyncInfo(dialog,
dialog, [getMetadataLock, processMetadataLock],
[getMetadataLock, processMetadataLock], total,
total, itemType)
itemType,
self.shouldStop
)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
@ -703,9 +664,10 @@ class LibrarySync(threading.Thread):
self.logMsg("Stop sent to all threads", 1) self.logMsg("Stop sent to all threads", 1)
# Wait till threads are indeed dead # Wait till threads are indeed dead
for thread in threads: for thread in threads:
thread.join(15.0) thread.join(5.0)
if thread.isAlive(): if thread.isAlive():
self.logMsg("Could not terminate thread", -1) self.logMsg("Could not terminate thread", -1)
# Make sure dialog window is closed
if dialog: if dialog:
dialog.close() dialog.close()
self.logMsg("=====================", 1) self.logMsg("=====================", 1)
@ -743,7 +705,7 @@ class LibrarySync(threading.Thread):
##### PROCESS MOVIES ##### ##### PROCESS MOVIES #####
self.updatelist = [] self.updatelist = []
for view in views: for view in views:
if self.shouldStop(): if self.threadStopped():
return False return False
# Get items per view # Get items per view
viewId = view['id'] viewId = view['id']
@ -802,7 +764,7 @@ class LibrarySync(threading.Thread):
for view in views: for view in views:
if self.shouldStop(): if self.threadStopped():
return False return False
# Get items per view # Get items per view
@ -825,7 +787,7 @@ class LibrarySync(threading.Thread):
count = 0 count = 0
for embymvideo in embymvideos: for embymvideo in embymvideos:
# Process individual musicvideo # Process individual musicvideo
if self.shouldStop(): if self.threadStopped():
return False return False
title = embymvideo['Name'] title = embymvideo['Name']
@ -879,7 +841,7 @@ class LibrarySync(threading.Thread):
##### PROCESS TV Shows ##### ##### PROCESS TV Shows #####
self.updatelist = [] self.updatelist = []
for view in views: for view in views:
if self.shouldStop(): if self.threadStopped():
return False return False
# Get items per view # Get items per view
viewId = view['id'] viewId = view['id']
@ -899,7 +861,7 @@ class LibrarySync(threading.Thread):
##### PROCESS TV Seasons ##### ##### PROCESS TV Seasons #####
# Cycle through tv shows # Cycle through tv shows
for tvShowId in allPlexTvShowsId: for tvShowId in allPlexTvShowsId:
if self.shouldStop(): if self.threadStopped():
return False return False
# Grab all seasons to tvshow from PMS # Grab all seasons to tvshow from PMS
seasons = plx.GetAllPlexChildren(tvShowId) seasons = plx.GetAllPlexChildren(tvShowId)
@ -915,7 +877,7 @@ class LibrarySync(threading.Thread):
##### PROCESS TV Episodes ##### ##### PROCESS TV Episodes #####
# Cycle through tv shows # Cycle through tv shows
for view in views: for view in views:
if self.shouldStop(): if self.threadStopped():
return False return False
# Grab all episodes to tvshow from PMS # Grab all episodes to tvshow from PMS
episodes = plx.GetAllPlexLeaves(view['id']) episodes = plx.GetAllPlexLeaves(view['id'])
@ -982,7 +944,7 @@ class LibrarySync(threading.Thread):
count = 0 count = 0
for embyitem in embyitems: for embyitem in embyitems:
# Process individual item # Process individual item
if self.shouldStop(): if self.threadStopped():
return False return False
title = embyitem['Name'] title = embyitem['Name']
@ -1128,18 +1090,17 @@ class LibrarySync(threading.Thread):
def run_internal(self): def run_internal(self):
startupComplete = False startupComplete = False
monitor = self.monitor
self.logMsg("---===### Starting LibrarySync ###===---", 0) self.logMsg("---===### Starting LibrarySync ###===---", 0)
while not self.threadStopped():
while not monitor.abortRequested():
# In the event the server goes offline # In the event the server goes offline
while self.suspend_thread: while self.threadSuspended():
# Set in service.py # Set in service.py
if monitor.waitForAbort(5): if self.threadStopped():
# Abort was requested while waiting. We should exit # Abort was requested while waiting. We should exit
break break
xbmc.sleep(3000)
if (utils.window('emby_dbCheck') != "true" and if (utils.window('emby_dbCheck') != "true" and
utils.settings('SyncInstallRunDone') == "true"): utils.settings('SyncInstallRunDone') == "true"):
@ -1217,29 +1178,13 @@ class LibrarySync(threading.Thread):
librarySync = self.startSync() librarySync = self.startSync()
self.logMsg("SyncDatabase onWake (finished) %s" % librarySync, 0) self.logMsg("SyncDatabase onWake (finished) %s" % librarySync, 0)
if self.stop_thread: if self.threadStopped():
# Set in service.py # Set in service.py
self.logMsg("Service terminated thread.", 2) self.logMsg("Service terminated thread.", 2)
break break
if monitor.waitForAbort(1):
# Abort was requested while waiting. We should exit
break
self.logMsg("###===--- LibrarySync Stopped ---===###", 0) self.logMsg("###===--- LibrarySync Stopped ---===###", 0)
def stopThread(self):
self.stop_thread = True
self.logMsg("Ending thread...", 2)
def suspendThread(self):
self.suspend_thread = True
self.logMsg("Pausing thread...", 0)
def resumeThread(self):
self.suspend_thread = False
self.logMsg("Resuming thread...", 0)
class ManualSync(LibrarySync): class ManualSync(LibrarySync):
@ -1278,7 +1223,7 @@ class ManualSync(LibrarySync):
##### PROCESS MOVIES ##### ##### PROCESS MOVIES #####
for view in views: for view in views:
if self.shouldStop(): if self.threadStopped():
return False return False
# Get items per view # Get items per view
@ -1293,7 +1238,7 @@ class ManualSync(LibrarySync):
all_embymovies = emby.getMovies(viewId, basic=True, dialog=pdialog) all_embymovies = emby.getMovies(viewId, basic=True, dialog=pdialog)
for embymovie in all_embymovies['Items']: for embymovie in all_embymovies['Items']:
if self.shouldStop(): if self.threadStopped():
return False return False
API = api.API(embymovie) API = api.API(embymovie)
@ -1316,7 +1261,7 @@ class ManualSync(LibrarySync):
count = 0 count = 0
for embymovie in embymovies: for embymovie in embymovies:
# Process individual movies # Process individual movies
if self.shouldStop(): if self.threadStopped():
return False return False
title = embymovie['Name'] title = embymovie['Name']
@ -1338,7 +1283,7 @@ class ManualSync(LibrarySync):
for boxset in boxsets['Items']: for boxset in boxsets['Items']:
if self.shouldStop(): if self.threadStopped():
return False return False
# Boxset has no real userdata, so using etag to compare # Boxset has no real userdata, so using etag to compare

View file

@ -19,12 +19,12 @@ import librarysync
################################################################################################## ##################################################################################################
@utils.ThreadMethods
class UserClient(threading.Thread): class UserClient(threading.Thread):
# Borg - multiple instances, shared state # Borg - multiple instances, shared state
__shared_state = {} __shared_state = {}
stopClient = False
auth = True auth = True
retry = 0 retry = 0
@ -366,10 +366,13 @@ class UserClient(threading.Thread):
def run(self): def run(self):
monitor = xbmc.Monitor()
self.logMsg("----===## Starting UserClient ##===----", 0) self.logMsg("----===## Starting UserClient ##===----", 0)
while not monitor.abortRequested(): while not self.threadStopped():
while self.threadSuspended():
if self.threadStopped():
break
xbmc.sleep(3000)
status = utils.window('emby_serverStatus') status = utils.window('emby_serverStatus')
if status: if status:
@ -406,18 +409,5 @@ class UserClient(threading.Thread):
self.logMsg("Username found: %s" % username, 2) self.logMsg("Username found: %s" % username, 2)
self.auth = True self.auth = True
self.doUtils.stopSession()
if self.stopClient == True:
# If stopping the client didn't work
break
if monitor.waitForAbort(1):
# Abort was requested while waiting. We should exit
break
self.doUtils.stopSession()
self.logMsg("##===---- UserClient Stopped ----===##", 0) self.logMsg("##===---- UserClient Stopped ----===##", 0)
def stopClient(self):
# When emby for kodi terminates
self.stopClient = True

View file

@ -20,17 +20,65 @@ import clientinfo
################################################################################################# #################################################################################################
def borg(cls): def ThreadMethodsStopsync(cls):
""" """
Dekorator to turn a class into a borg class with an added "@utils.borg" Decorator to replace stopThread method to include the Kodi window property
""" 'emby_shouldStop'
cls._state = {}
orig_init = cls.__init__
def new_init(self, *args, **kwargs): Use with any library sync threads. @ThreadMethods still required FIRST
self.__dict__ = cls._state """
orig_init(self, *args, **kwargs) def threadStopped(self):
cls.__init__ = new_init return (self._threadStopped or
self._abortMonitor.abortRequested() or
window('emby_shouldStop') == "true")
cls.threadStopped = threadStopped
return cls
def ThreadMethods(cls):
"""
Decorator to add the following methods to a threading class:
suspendThread(): pauses the thread
resumeThread(): resumes the thread
stopThread(): stopps the thread
threadSuspended(): returns True if thread is suspend_thread
threadStopped(): returns True if thread is stopped (or should stop ;-))
ALSO stops if Kodi is exited
Also adds the following class attributes:
_threadStopped
_threadSuspended
_abortMonitor = xbmc.Monitor() (to check for premature Kodi exit)
"""
# Attach new attributes to class
cls._threadStopped = False
cls._threadSuspended = False
cls._abortMonitor = xbmc.Monitor()
# Define new class methods and attach them to class
def stopThread(self):
self._threadStopped = True
cls.stopThread = stopThread
def suspendThread(self):
self._threadSuspended = True
cls.suspendThread = suspendThread
def resumeThread(self):
self._threadSuspended = False
cls.resumeThread = resumeThread
def threadSuspended(self):
return self._threadSuspended
cls.threadSuspended = threadSuspended
def threadStopped(self):
return self._threadStopped or self._abortMonitor.abortRequested()
cls.threadStopped = threadStopped
# Return class to render this a decorator
return cls return cls

View file

@ -293,7 +293,7 @@ class Service():
##### Emby thread is terminating. ##### ##### Emby thread is terminating. #####
if self.plexCompanion_running: if self.plexCompanion_running:
plexCompanion.stopClient() plexCompanion.stopThread()
if self.library_running: if self.library_running:
library.stopThread() library.stopThread()
@ -302,7 +302,7 @@ class Service():
# ws.stopClient() # ws.stopClient()
if self.userclient_running: if self.userclient_running:
user.stopClient() user.stopThread()
self.logMsg("======== STOP %s ========" % self.addonName, 0) self.logMsg("======== STOP %s ========" % self.addonName, 0)