Revert lib thread changes
This commit is contained in:
parent
8912a0b601
commit
87f9c9ef61
1 changed files with 67 additions and 94 deletions
|
@ -41,11 +41,10 @@ class ThreadedGetMetadata(threading.Thread):
|
||||||
the downloaded metadata XMLs as etree objects
|
the downloaded metadata XMLs as etree objects
|
||||||
lock threading.Lock(), used for counting where we are
|
lock threading.Lock(), used for counting where we are
|
||||||
"""
|
"""
|
||||||
def __init__(self, queue, out_queue, lock, errorQueue):
|
def __init__(self, queue, out_queue, lock):
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.out_queue = out_queue
|
self.out_queue = out_queue
|
||||||
self.lock = lock
|
self.lock = lock
|
||||||
self.errorQueue = errorQueue
|
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
@ -55,38 +54,34 @@ class ThreadedGetMetadata(threading.Thread):
|
||||||
lock = self.lock
|
lock = self.lock
|
||||||
threadStopped = self.threadStopped
|
threadStopped = self.threadStopped
|
||||||
global getMetadataCount
|
global getMetadataCount
|
||||||
try:
|
while threadStopped() is False:
|
||||||
while threadStopped() is False:
|
# grabs Plex item from queue
|
||||||
# grabs Plex item from queue
|
try:
|
||||||
try:
|
updateItem = queue.get(block=False)
|
||||||
updateItem = queue.get(block=False)
|
# Empty queue
|
||||||
# Empty queue
|
except Queue.Empty:
|
||||||
except Queue.Empty:
|
continue
|
||||||
continue
|
# Download Metadata
|
||||||
# Download Metadata
|
plexXML = PlexFunctions.GetPlexMetadata(updateItem['itemId'])
|
||||||
plexXML = PlexFunctions.GetPlexMetadata(updateItem['itemId'])
|
try:
|
||||||
try:
|
plexXML.tag
|
||||||
plexXML.tag
|
except:
|
||||||
except:
|
# Did not receive a valid XML - skip that one for now
|
||||||
# Did not receive a valid XML - skip that one for now
|
|
||||||
queue.task_done()
|
|
||||||
continue
|
|
||||||
# Get rid of first XML level:
|
|
||||||
|
|
||||||
updateItem['XML'] = plexXML
|
|
||||||
# place item into out queue
|
|
||||||
out_queue.put(updateItem)
|
|
||||||
del plexXML
|
|
||||||
del updateItem
|
|
||||||
# If we don't have a valid XML, don't put that into the queue
|
|
||||||
# but skip this item for now
|
|
||||||
# Keep track of where we are at
|
|
||||||
with lock:
|
|
||||||
getMetadataCount += 1
|
|
||||||
# signals to queue job is done
|
|
||||||
queue.task_done()
|
queue.task_done()
|
||||||
except:
|
continue
|
||||||
self.errorQueue.put(sys.exc_info())
|
|
||||||
|
updateItem['XML'] = plexXML
|
||||||
|
# place item into out queue
|
||||||
|
out_queue.put(updateItem)
|
||||||
|
del plexXML
|
||||||
|
del updateItem
|
||||||
|
# If we don't have a valid XML, don't put that into the queue
|
||||||
|
# but skip this item for now
|
||||||
|
# Keep track of where we are at
|
||||||
|
with lock:
|
||||||
|
getMetadataCount += 1
|
||||||
|
# signals to queue job is done
|
||||||
|
queue.task_done()
|
||||||
|
|
||||||
|
|
||||||
@utils.ThreadMethodsAdditionalStop('emby_shouldStop')
|
@utils.ThreadMethodsAdditionalStop('emby_shouldStop')
|
||||||
|
@ -103,11 +98,10 @@ class ThreadedProcessMetadata(threading.Thread):
|
||||||
e.g. 'Movies' => itemtypes.Movies()
|
e.g. 'Movies' => itemtypes.Movies()
|
||||||
lock: threading.Lock(), used for counting where we are
|
lock: threading.Lock(), used for counting where we are
|
||||||
"""
|
"""
|
||||||
def __init__(self, queue, itemType, lock, errorQueue):
|
def __init__(self, queue, itemType, lock):
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.lock = lock
|
self.lock = lock
|
||||||
self.itemType = itemType
|
self.itemType = itemType
|
||||||
self.errorQueue = errorQueue
|
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
@ -119,37 +113,32 @@ class ThreadedProcessMetadata(threading.Thread):
|
||||||
threadStopped = self.threadStopped
|
threadStopped = self.threadStopped
|
||||||
global processMetadataCount
|
global processMetadataCount
|
||||||
global processingViewName
|
global processingViewName
|
||||||
try:
|
with itemFkt() as item:
|
||||||
with itemFkt() as item:
|
while threadStopped() is False:
|
||||||
while threadStopped() is False:
|
# grabs item from queue
|
||||||
# grabs item from queue
|
try:
|
||||||
try:
|
updateItem = queue.get(block=False)
|
||||||
updateItem = queue.get(block=False)
|
# Empty queue
|
||||||
# Empty queue
|
except Queue.Empty:
|
||||||
except Queue.Empty:
|
continue
|
||||||
continue
|
# Do the work; lock to be sure we've only got 1 Thread
|
||||||
# Do the work; lock to be sure we've only got 1 Thread
|
plexitem = updateItem['XML']
|
||||||
plexitem = updateItem['XML']
|
method = updateItem['method']
|
||||||
method = updateItem['method']
|
viewName = updateItem['viewName']
|
||||||
viewName = updateItem['viewName']
|
viewId = updateItem['viewId']
|
||||||
viewId = updateItem['viewId']
|
title = updateItem['title']
|
||||||
title = updateItem['title']
|
itemSubFkt = getattr(item, method)
|
||||||
itemSubFkt = getattr(item, method)
|
with lock:
|
||||||
with lock:
|
itemSubFkt(plexitem,
|
||||||
itemSubFkt(plexitem,
|
viewtag=viewName,
|
||||||
viewtag=viewName,
|
viewid=viewId)
|
||||||
viewid=viewId)
|
# Keep track of where we are at
|
||||||
# Keep track of where we are at
|
processMetadataCount += 1
|
||||||
processMetadataCount += 1
|
processingViewName = title
|
||||||
processingViewName = title
|
del plexitem
|
||||||
del plexitem
|
del updateItem
|
||||||
del updateItem
|
# signals to queue job is done
|
||||||
# signals to queue job is done
|
self.queue.task_done()
|
||||||
self.queue.task_done()
|
|
||||||
except:
|
|
||||||
xbmc.log('An error occured')
|
|
||||||
xbmc.log(sys.exc_info())
|
|
||||||
self.errorQueue.put(sys.exc_info())
|
|
||||||
|
|
||||||
|
|
||||||
@utils.ThreadMethodsAdditionalStop('emby_shouldStop')
|
@utils.ThreadMethodsAdditionalStop('emby_shouldStop')
|
||||||
|
@ -204,7 +193,7 @@ class ThreadedShowSyncInfo(threading.Thread):
|
||||||
% (getMetadataProgress, processMetadataProgress,
|
% (getMetadataProgress, processMetadataProgress,
|
||||||
viewName))
|
viewName))
|
||||||
except:
|
except:
|
||||||
# Unicode formating of the string?!?
|
# Wierd formating of the string viewName?!?
|
||||||
pass
|
pass
|
||||||
# Sleep for x milliseconds
|
# Sleep for x milliseconds
|
||||||
xbmc.sleep(500)
|
xbmc.sleep(500)
|
||||||
|
@ -669,7 +658,6 @@ class LibrarySync(threading.Thread):
|
||||||
self.logMsg("Starting sync threads", 1)
|
self.logMsg("Starting sync threads", 1)
|
||||||
getMetadataQueue = Queue.Queue()
|
getMetadataQueue = Queue.Queue()
|
||||||
processMetadataQueue = Queue.Queue(maxsize=100)
|
processMetadataQueue = Queue.Queue(maxsize=100)
|
||||||
errorQueue = Queue.Queue()
|
|
||||||
getMetadataLock = threading.Lock()
|
getMetadataLock = threading.Lock()
|
||||||
processMetadataLock = threading.Lock()
|
processMetadataLock = threading.Lock()
|
||||||
# To keep track
|
# To keep track
|
||||||
|
@ -687,12 +675,19 @@ class LibrarySync(threading.Thread):
|
||||||
for i in range(min(self.syncThreadNumber, itemNumber)):
|
for i in range(min(self.syncThreadNumber, itemNumber)):
|
||||||
thread = ThreadedGetMetadata(getMetadataQueue,
|
thread = ThreadedGetMetadata(getMetadataQueue,
|
||||||
processMetadataQueue,
|
processMetadataQueue,
|
||||||
getMetadataLock,
|
getMetadataLock)
|
||||||
errorQueue)
|
|
||||||
thread.setDaemon(True)
|
thread.setDaemon(True)
|
||||||
thread.start()
|
thread.start()
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
self.logMsg("Download threads spawned", 1)
|
self.logMsg("Download threads spawned", 1)
|
||||||
|
# Spawn one more thread to process Metadata, once downloaded
|
||||||
|
thread = ThreadedProcessMetadata(processMetadataQueue,
|
||||||
|
itemType,
|
||||||
|
processMetadataLock)
|
||||||
|
thread.setDaemon(True)
|
||||||
|
thread.start()
|
||||||
|
threads.append(thread)
|
||||||
|
self.logMsg("Processing thread spawned", 1)
|
||||||
# Start one thread to show sync progress
|
# Start one thread to show sync progress
|
||||||
dialog = xbmcgui.DialogProgressBG()
|
dialog = xbmcgui.DialogProgressBG()
|
||||||
thread = ThreadedShowSyncInfo(dialog,
|
thread = ThreadedShowSyncInfo(dialog,
|
||||||
|
@ -703,32 +698,10 @@ class LibrarySync(threading.Thread):
|
||||||
thread.start()
|
thread.start()
|
||||||
threads.append(thread)
|
threads.append(thread)
|
||||||
self.logMsg("Kodi Infobox thread spawned", 1)
|
self.logMsg("Kodi Infobox thread spawned", 1)
|
||||||
# Spawn one more thread to process Metadata, once downloaded
|
|
||||||
thread = ThreadedProcessMetadata(processMetadataQueue,
|
|
||||||
itemType,
|
|
||||||
processMetadataLock,
|
|
||||||
errorQueue)
|
|
||||||
thread.setDaemon(True)
|
|
||||||
thread.start()
|
|
||||||
threads.append(thread)
|
|
||||||
self.logMsg("Processing thread spawned", 1)
|
|
||||||
|
|
||||||
# Wait until finished
|
# Wait until finished
|
||||||
while True:
|
getMetadataQueue.join()
|
||||||
try:
|
processMetadataQueue.join()
|
||||||
exc = errorQueue.get(block=False)
|
|
||||||
except Queue.Empty:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
exc_type, exc_obj, exc_trace = exc
|
|
||||||
# deal with the exception
|
|
||||||
self.logMsg("Error occured in thread", -1)
|
|
||||||
self.logMsg(str(exc_type) + str(exc_obj), -1)
|
|
||||||
self.logMsg(exc_trace, -1)
|
|
||||||
if getMetadataQueue.empty() and processMetadataQueue.empty():
|
|
||||||
break
|
|
||||||
xbmc.sleep(500)
|
|
||||||
|
|
||||||
# Kill threads
|
# Kill threads
|
||||||
self.logMsg("Waiting to kill threads", 1)
|
self.logMsg("Waiting to kill threads", 1)
|
||||||
for thread in threads:
|
for thread in threads:
|
||||||
|
|
Loading…
Reference in a new issue