Sped up initial sync
This commit is contained in:
parent
9f3db90a7c
commit
de3a058463
2 changed files with 134 additions and 111 deletions
|
@ -1599,13 +1599,14 @@ class API():
|
|||
provider = regex.findall(item['guid'])
|
||||
try:
|
||||
provider = provider[0]
|
||||
except:
|
||||
except KeyError:
|
||||
provider = None
|
||||
return provider
|
||||
|
||||
def getTitle(self):
|
||||
"""
|
||||
Returns an item's name/title or "Missing Title Name"
|
||||
Returns an item's name/title or "Missing Title Name" for both XML and
|
||||
JSON PMS replies
|
||||
|
||||
Output:
|
||||
title, sorttitle
|
||||
|
@ -1613,7 +1614,12 @@ class API():
|
|||
sorttitle = title, if no sorttitle is found
|
||||
"""
|
||||
item = self.item
|
||||
item = item[self.child].attrib
|
||||
# XML
|
||||
try:
|
||||
item = item[self.child].attrib
|
||||
# JSON
|
||||
except KeyError:
|
||||
pass
|
||||
try:
|
||||
title = item['title']
|
||||
except:
|
||||
|
|
|
@ -55,14 +55,15 @@ class ThreadedGetMetadata(threading.Thread):
|
|||
while self.stopped() is False:
|
||||
# grabs Plex item from queue
|
||||
try:
|
||||
itemId = self.queue.get(block=False)
|
||||
updateItem = self.queue.get(block=False)
|
||||
# Empty queue
|
||||
except:
|
||||
continue
|
||||
# Download Metadata
|
||||
plexElement = plx.GetPlexMetadata(itemId)
|
||||
plexElement = plx.GetPlexMetadata(updateItem['itemId'])
|
||||
updateItem['XML'] = plexElement
|
||||
# place item into out queue
|
||||
self.out_queue.put(plexElement)
|
||||
self.out_queue.put(updateItem)
|
||||
# Keep track of where we are at
|
||||
with self.lock:
|
||||
getMetadataCount += 1
|
||||
|
@ -84,41 +85,40 @@ class ThreadedProcessMetadata(threading.Thread):
|
|||
Input:
|
||||
queue: Queue.Queue() object that you'll need to fill up with
|
||||
the downloaded XML eTree objects
|
||||
data = {
|
||||
'itemType': as used to call functions in itemtypes.py
|
||||
e.g. 'Movies' => itemtypes.Movies()
|
||||
'method' Name of method in itemtypes.py
|
||||
e.g. 'add_updateSeason'
|
||||
'viewName' Plex str for library view (e.g. 'Movies')
|
||||
'viewId' Plex Id to identifiy the library view
|
||||
}
|
||||
itemType: as used to call functions in itemtypes.py
|
||||
e.g. 'Movies' => itemtypes.Movies()
|
||||
lock: threading.Lock(), used for counting where we are
|
||||
userStop Handle to a function where True is used to stop this Thread
|
||||
"""
|
||||
def __init__(self, queue, data, lock, userStop):
|
||||
def __init__(self, queue, itemType, lock, userStop):
|
||||
self.queue = queue
|
||||
self.lock = lock
|
||||
self.data = data
|
||||
self.itemType = itemType
|
||||
self.userStop = userStop
|
||||
self._shouldstop = threading.Event()
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
def run(self):
|
||||
# Constructs the method name, e.g. itemtypes.Movies
|
||||
itemFkt = getattr(itemtypes, self.data['itemType'])
|
||||
viewName = self.data['viewName']
|
||||
viewId = self.data['viewId']
|
||||
itemFkt = getattr(itemtypes, self.itemType)
|
||||
global processMetadataCount
|
||||
global processingViewName
|
||||
with itemFkt() as item:
|
||||
itemSubFkt = getattr(item, self.data['method'])
|
||||
while self.stopped() is False:
|
||||
# grabs item from queue
|
||||
try:
|
||||
plexitem = self.queue.get(block=False)
|
||||
updateItem = self.queue.get(block=False)
|
||||
# Empty queue
|
||||
except:
|
||||
continue
|
||||
# Do the work; lock to be sure we've only got 1 Thread
|
||||
plexitem = updateItem['XML']
|
||||
method = updateItem['method']
|
||||
viewName = updateItem['viewName']
|
||||
viewId = updateItem['viewId']
|
||||
title = updateItem['title']
|
||||
itemSubFkt = getattr(item, method)
|
||||
|
||||
with self.lock:
|
||||
itemSubFkt(
|
||||
plexitem,
|
||||
|
@ -127,6 +127,7 @@ class ThreadedProcessMetadata(threading.Thread):
|
|||
)
|
||||
# Keep track of where we are at
|
||||
processMetadataCount += 1
|
||||
processingViewName = title
|
||||
# signals to queue job is done
|
||||
self.queue.task_done()
|
||||
|
||||
|
@ -145,16 +146,16 @@ class ThreadedShowSyncInfo(threading.Thread):
|
|||
dialog xbmcgui.DialogProgressBG() object to show progress
|
||||
locks = [downloadLock, processLock] Locks() to the other threads
|
||||
total: Total number of items to get
|
||||
viewName: Name of library we're getting
|
||||
userStop: function handle to stop thread
|
||||
"""
|
||||
def __init__(self, dialog, locks, total, viewName, userStop):
|
||||
def __init__(self, dialog, locks, total, itemType, userStop):
|
||||
self.locks = locks
|
||||
self.total = total
|
||||
self.viewName = viewName
|
||||
self.userStop = userStop
|
||||
self.addonName = clientinfo.ClientInfo().getAddonName()
|
||||
self._shouldstop = threading.Event()
|
||||
self.dialog = dialog
|
||||
self.itemType = itemType
|
||||
threading.Thread.__init__(self)
|
||||
|
||||
def run(self):
|
||||
|
@ -163,18 +164,20 @@ class ThreadedShowSyncInfo(threading.Thread):
|
|||
processLock = self.locks[1]
|
||||
self.dialog.create(
|
||||
"%s: Sync %s: %s items" % (self.addonName,
|
||||
self.viewName,
|
||||
self.itemType,
|
||||
str(total)),
|
||||
"Starting")
|
||||
global getMetadataCount
|
||||
global processMetadataCount
|
||||
global processingViewName
|
||||
total = 2 * total
|
||||
totalProgress = 0
|
||||
while self.stopped() is False:
|
||||
with downloadLock:
|
||||
getMetadataProgress = getMetadataCount
|
||||
#with processLock:
|
||||
processMetadataProgress = processMetadataCount
|
||||
with processLock:
|
||||
processMetadataProgress = processMetadataCount
|
||||
viewName = processingViewName
|
||||
totalProgress = getMetadataProgress + processMetadataProgress
|
||||
try:
|
||||
percentage = int(float(totalProgress) / float(total)*100.0)
|
||||
|
@ -182,10 +185,11 @@ class ThreadedShowSyncInfo(threading.Thread):
|
|||
percentage = 0
|
||||
self.dialog.update(
|
||||
percentage,
|
||||
message="Downloaded: %s, Processed: %s" %
|
||||
(getMetadataProgress, processMetadataProgress)
|
||||
message="Downloaded: %s, Processed: %s: %s" %
|
||||
(getMetadataProgress, processMetadataProgress, viewName)
|
||||
)
|
||||
time.sleep(1)
|
||||
self.dialog.close()
|
||||
|
||||
def stopThread(self):
|
||||
self._shouldstop.set()
|
||||
|
@ -573,7 +577,7 @@ class LibrarySync(threading.Thread):
|
|||
# Save total
|
||||
utils.window('Emby.nodes.total', str(totalnodes))
|
||||
|
||||
def GetUpdatelist(self, elementList):
|
||||
def GetUpdatelist(self, elementList, itemType, method, viewName, viewId):
|
||||
"""
|
||||
Adds items to self.updatelist as well as self.allPlexElementsId dict
|
||||
|
||||
|
@ -584,56 +588,71 @@ class LibrarySync(threading.Thread):
|
|||
Output: self.updatelist, self.allPlexElementsId
|
||||
self.updatelist APPENDED(!!) list itemids (Plex Keys as
|
||||
as received from API.getKey())
|
||||
= [
|
||||
{
|
||||
'itemId': xxx,
|
||||
'itemType': 'Movies'/'TVShows'/...,
|
||||
'method': 'add_update', 'add_updateSeason', ...
|
||||
'viewName': xxx,
|
||||
'viewId': xxx
|
||||
}, ...
|
||||
]
|
||||
self.allPlexElementsId APPENDED(!!) dic
|
||||
= {itemid: checksum}
|
||||
"""
|
||||
if self.compare:
|
||||
# Manual sync
|
||||
for item in elementList:
|
||||
# Skipping XML item 'title=All episodes' without ratingKey
|
||||
# Skipping XML item 'title=All episodes' without a 'ratingKey'
|
||||
if item.get('ratingKey', False):
|
||||
if self.shouldStop():
|
||||
return False
|
||||
API = PlexAPI.API(item)
|
||||
plex_checksum = API.getChecksum()
|
||||
itemid = API.getKey()
|
||||
self.allPlexElementsId[itemid] = plex_checksum
|
||||
kodi_checksum = self.allKodiElementsId.get(itemid)
|
||||
itemId = API.getKey()
|
||||
title, sorttitle = API.getTitle()
|
||||
self.allPlexElementsId[itemId] = plex_checksum
|
||||
kodi_checksum = self.allKodiElementsId.get(itemId)
|
||||
if kodi_checksum != plex_checksum:
|
||||
# Only update if movie is not in Kodi or checksum is
|
||||
# different
|
||||
self.updatelist.append(itemid)
|
||||
self.updatelist.append({
|
||||
'itemId': itemId,
|
||||
'itemType': itemType,
|
||||
'method': method,
|
||||
'viewName': viewName,
|
||||
'viewId': viewId,
|
||||
'title': title})
|
||||
else:
|
||||
# Initial or repair sync: get all Plex movies
|
||||
for item in elementList:
|
||||
# Only look at valid items = Plex library items
|
||||
if item.get('ratingKey', False):
|
||||
if self.shouldStop():
|
||||
return False
|
||||
API = PlexAPI.API(item)
|
||||
itemid = API.getKey()
|
||||
itemId = API.getKey()
|
||||
plex_checksum = API.getChecksum()
|
||||
self.allPlexElementsId[itemid] = plex_checksum
|
||||
self.updatelist.append(itemid)
|
||||
self.allPlexElementsId[itemId] = plex_checksum
|
||||
self.updatelist.append({
|
||||
'itemId': itemId,
|
||||
'itemType': itemType,
|
||||
'method': method,
|
||||
'viewName': viewName,
|
||||
'viewId': viewId})
|
||||
# Update the Kodi popup info
|
||||
return self.updatelist, self.allPlexElementsId
|
||||
|
||||
def GetAndProcessXMLs(self, itemType, viewName, viewId, method):
|
||||
def GetAndProcessXMLs(self, itemType):
|
||||
"""
|
||||
Downloads all XMLs for itemType (e.g. Movies, TV-Shows). Processes them
|
||||
by then calling itemtypes.<itemType>()
|
||||
|
||||
Input:
|
||||
itemType: to construct itemtypes.py function name: 'Movies'
|
||||
viewName: Plex name of library, e.g. 'My Movies'
|
||||
viewId: Plex Id for that library
|
||||
method: name of itemtypes.py method,
|
||||
e.g. 'add_updateSeason'
|
||||
|
||||
Returns True if/when done
|
||||
self.updatelist
|
||||
"""
|
||||
# Some logging, just in case.
|
||||
self.logMsg("self.updatelist: %s" % self.updatelist, 2)
|
||||
self.logMsg("self.allPlexElementsId: %s" % self.allPlexElementsId, 2)
|
||||
self.logMsg("self.allKodiElementsId: %s" % self.allKodiElementsId, 2)
|
||||
|
||||
# Run through self.updatelist, get XML metadata per item
|
||||
# Initiate threads
|
||||
|
@ -649,36 +668,28 @@ class LibrarySync(threading.Thread):
|
|||
getMetadataCount = 0
|
||||
global processMetadataCount
|
||||
processMetadataCount = 0
|
||||
global processingViewName
|
||||
processingViewName = ''
|
||||
# Populate queue: GetMetadata
|
||||
for itemId in self.updatelist:
|
||||
getMetadataQueue.put(itemId)
|
||||
# Spawn GetMetadata threads
|
||||
for updateItem in self.updatelist:
|
||||
getMetadataQueue.put(updateItem)
|
||||
# Spawn GetMetadata threads for downloading
|
||||
threads = []
|
||||
for i in range(self.syncThreadNumber):
|
||||
t = ThreadedGetMetadata(
|
||||
getMetadataQueue,
|
||||
processMetadataQueue,
|
||||
getMetadataLock,
|
||||
self.shouldStop
|
||||
)
|
||||
t.setDaemon(True)
|
||||
t.start()
|
||||
threads.append(t)
|
||||
thread = ThreadedGetMetadata(getMetadataQueue,
|
||||
processMetadataQueue,
|
||||
getMetadataLock,
|
||||
self.shouldStop)
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
self.logMsg("%s download threads spawned" % self.syncThreadNumber, 1)
|
||||
self.logMsg("Queue populated", 1)
|
||||
# Spawn one more thread to process Metadata, once downloaded
|
||||
data = {
|
||||
'itemType': itemType,
|
||||
'method': method,
|
||||
'viewName': viewName,
|
||||
'viewId': viewId
|
||||
}
|
||||
thread = ThreadedProcessMetadata(
|
||||
processMetadataQueue,
|
||||
data,
|
||||
processMetadataLock,
|
||||
self.shouldStop
|
||||
)
|
||||
thread = ThreadedProcessMetadata(processMetadataQueue,
|
||||
itemType,
|
||||
processMetadataLock,
|
||||
self.shouldStop)
|
||||
thread.setDaemon(True)
|
||||
thread.start()
|
||||
threads.append(thread)
|
||||
|
@ -691,7 +702,7 @@ class LibrarySync(threading.Thread):
|
|||
dialog,
|
||||
[getMetadataLock, processMetadataLock],
|
||||
total,
|
||||
viewName,
|
||||
itemType,
|
||||
self.shouldStop
|
||||
)
|
||||
thread.setDaemon(True)
|
||||
|
@ -709,7 +720,8 @@ class LibrarySync(threading.Thread):
|
|||
# Wait till threads are indeed dead
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
dialog.close()
|
||||
if dialog:
|
||||
dialog.close()
|
||||
self.logMsg("=====================", 1)
|
||||
self.logMsg("Sync threads finished", 1)
|
||||
self.logMsg("=====================", 1)
|
||||
|
@ -718,11 +730,11 @@ class LibrarySync(threading.Thread):
|
|||
def PlexMovies(self):
|
||||
# Initialize
|
||||
plx = PlexAPI.PlexAPI()
|
||||
self.updatelist = []
|
||||
self.allPlexElementsId = {}
|
||||
itemType = 'Movies'
|
||||
|
||||
views = plx.GetPlexCollections('movie')
|
||||
self.logMsg("Media folders: %s" % views, 1)
|
||||
self.logMsg("Processing Plex %s. Libraries: %s" % (itemType, views), 1)
|
||||
|
||||
if self.compare:
|
||||
# Get movies from Plex server
|
||||
|
@ -740,8 +752,8 @@ class LibrarySync(threading.Thread):
|
|||
self.allKodiElementsId = {}
|
||||
|
||||
##### PROCESS MOVIES #####
|
||||
self.updatelist = []
|
||||
for view in views:
|
||||
self.updatelist = []
|
||||
if self.shouldStop():
|
||||
return False
|
||||
# Get items per view
|
||||
|
@ -749,14 +761,13 @@ class LibrarySync(threading.Thread):
|
|||
viewName = view['name']
|
||||
all_plexmovies = plx.GetPlexSectionResults(viewId)
|
||||
# Populate self.updatelist and self.allPlexElementsId
|
||||
self.GetUpdatelist(all_plexmovies)
|
||||
self.logMsg("Processed view %s with ID %s" % (viewName, viewId), 1)
|
||||
self.GetAndProcessXMLs(
|
||||
'Movies',
|
||||
viewName,
|
||||
viewId,
|
||||
'add_update'
|
||||
)
|
||||
self.GetUpdatelist(all_plexmovies,
|
||||
itemType,
|
||||
'add_update',
|
||||
viewName,
|
||||
viewId)
|
||||
self.GetAndProcessXMLs(itemType)
|
||||
self.logMsg("Processed view %s with ID %s" % (viewName, viewId), 1)
|
||||
##### PROCESS DELETES #####
|
||||
if self.compare:
|
||||
# Manual sync, process deletes
|
||||
|
@ -765,7 +776,7 @@ class LibrarySync(threading.Thread):
|
|||
if kodimovie not in self.allPlexElementsId:
|
||||
Movie.remove(kodimovie)
|
||||
else:
|
||||
self.logMsg("%s compare finished." % 'Movies', 1)
|
||||
self.logMsg("%s compare finished." % itemType, 1)
|
||||
return True
|
||||
|
||||
def musicvideos(self, embycursor, kodicursor, pdialog, compare=False):
|
||||
|
@ -959,8 +970,8 @@ class LibrarySync(threading.Thread):
|
|||
def PlexTVShows(self):
|
||||
# Initialize
|
||||
plx = PlexAPI.PlexAPI()
|
||||
self.updatelist = []
|
||||
self.allPlexElementsId = {}
|
||||
itemType = 'TVShows'
|
||||
|
||||
views = plx.GetPlexCollections('show')
|
||||
self.logMsg("Media folders: %s" % views, 1)
|
||||
|
@ -986,8 +997,8 @@ class LibrarySync(threading.Thread):
|
|||
embyconn.close()
|
||||
|
||||
##### PROCESS TV Shows #####
|
||||
self.updatelist = []
|
||||
for view in views:
|
||||
self.updatelist = []
|
||||
if self.shouldStop():
|
||||
return False
|
||||
# Get items per view
|
||||
|
@ -995,12 +1006,11 @@ class LibrarySync(threading.Thread):
|
|||
viewName = view['name']
|
||||
allPlexTvShows = plx.GetPlexSectionResults(viewId)
|
||||
# Populate self.updatelist and self.allPlexElementsId
|
||||
self.GetUpdatelist(allPlexTvShows)
|
||||
# Process self.updatelist
|
||||
self.GetAndProcessXMLs('TVShows',
|
||||
viewName,
|
||||
viewId,
|
||||
'add_update')
|
||||
self.GetUpdatelist(allPlexTvShows,
|
||||
itemType,
|
||||
'add_update',
|
||||
viewName,
|
||||
viewId)
|
||||
self.logMsg("Processed view %s with ID %s" % (viewName, viewId), 1)
|
||||
|
||||
# COPY for later use
|
||||
|
@ -1009,35 +1019,42 @@ class LibrarySync(threading.Thread):
|
|||
##### PROCESS TV Seasons #####
|
||||
# Cycle through tv shows
|
||||
for tvShowId in allPlexTvShowsId:
|
||||
self.updatelist = []
|
||||
if self.shouldStop():
|
||||
return False
|
||||
# Grab all seasons to tvshow from PMS
|
||||
seasons = plx.GetAllPlexChildren(tvShowId)
|
||||
# Populate self.updatelist and self.allPlexElementsId
|
||||
self.GetUpdatelist(seasons)
|
||||
# Process self.updatelist
|
||||
self.GetAndProcessXMLs('TVShows',
|
||||
None,
|
||||
tvShowId, # send showId instead of viewid
|
||||
'add_updateSeason')
|
||||
XMLtvshow = plx.GetPlexMetadata(tvShowId)
|
||||
with itemtypes.TVShows() as TVshow:
|
||||
TVshow.refreshSeasonEntry(XMLtvshow, tvShowId)
|
||||
self.GetUpdatelist(seasons,
|
||||
itemType,
|
||||
'add_updateSeason',
|
||||
None,
|
||||
tvShowId) # send showId instead of viewid
|
||||
self.logMsg("Processed all seasons of TV show with Plex Id %s" % tvShowId, 1)
|
||||
|
||||
##### PROCESS TV Episodes #####
|
||||
# Cycle through tv shows
|
||||
for tvShowId in allPlexTvShowsId:
|
||||
self.updatelist = []
|
||||
if self.shouldStop():
|
||||
return False
|
||||
# Grab all episodes to tvshow from PMS
|
||||
episodes = plx.GetAllPlexLeaves(tvShowId)
|
||||
# Populate self.updatelist and self.allPlexElementsId
|
||||
self.GetUpdatelist(episodes)
|
||||
# Process self.updatelist
|
||||
self.GetAndProcessXMLs('TVShows',
|
||||
None,
|
||||
None,
|
||||
'add_updateEpisode')
|
||||
self.GetUpdatelist(episodes,
|
||||
itemType,
|
||||
'add_updateEpisode',
|
||||
None,
|
||||
None)
|
||||
self.logMsg("Processed all episodes of TV show with Plex Id %s" % tvShowId, 1)
|
||||
|
||||
# Process self.updatelist
|
||||
self.GetAndProcessXMLs(itemType)
|
||||
# Refresh season info
|
||||
# Cycle through tv shows
|
||||
with itemtypes.TVShows() as TVshow:
|
||||
for tvShowId in allPlexTvShowsId:
|
||||
XMLtvshow = plx.GetPlexMetadata(tvShowId)
|
||||
TVshow.refreshSeasonEntry(XMLtvshow, tvShowId)
|
||||
|
||||
##### PROCESS DELETES #####
|
||||
if self.compare:
|
||||
# Manual sync, process deletes
|
||||
|
|
Loading…
Reference in a new issue