Redesign library sync loops

This commit is contained in:
tomkat83 2016-01-27 20:41:28 +01:00
parent 40c8b6f683
commit 99895ec49f
3 changed files with 161 additions and 202 deletions

View file

@ -11,6 +11,24 @@ addonName = Addon().getAddonInfo('name')
title = "%s %s" % (addonName, __name__) title = "%s %s" % (addonName, __name__)
def GetItemClassFromType(itemType):
classes = {
'movie': 'Movies',
'episodes': 'TVShows',
'episode': 'TVShows',
'show': 'TVShows'
}
return classes[itemType]
def GetMethodFromPlexType(plexType):
methods = {
'movie': 'add_update',
'episode': 'add_updateEpisode'
}
return methods[plexType]
def XbmcItemtypes(): def XbmcItemtypes():
return ['photo', 'video', 'audio'] return ['photo', 'video', 'audio']
@ -128,6 +146,24 @@ def GetPlexSectionResults(viewId, headerOptions={}):
return result return result
def GetPlexUpdatedItems(viewId, unixTime, headerOptions={}):
"""
Returns a list (raw JSON or XML API dump) of all Plex items in the Plex
section with key = viewId AFTER the unixTime
"""
result = []
url = "{server}/library/sections/%s/allLeaves?updatedAt>=%s" \
% (viewId, unixTime)
jsondata = DownloadUtils().downloadUrl(url, headerOptions=headerOptions)
try:
result = jsondata['_children']
except KeyError:
logMsg(title,
"Error retrieving all items for Plex section %s and time %s"
% (viewId, unixTime), -1)
return result
def GetAllPlexLeaves(viewId, headerOptions={}): def GetAllPlexLeaves(viewId, headerOptions={}):
""" """
Returns a list (raw JSON or XML API dump) of all Plex subitems for the Returns a list (raw JSON or XML API dump) of all Plex subitems for the

View file

@ -31,6 +31,25 @@ class Embydb_Functions():
return views return views
def getAllViewInfo(self):
embycursor = self.embycursor
views = []
query = ' '.join((
"SELECT view_id, view_name, media_type",
"FROM view"
))
embycursor.execute(query)
rows = embycursor.fetchall()
for row in rows:
views.append({'id': row[0],
'name': row[1],
'itemtype': row[2]})
return views
def getView_byId(self, viewid): def getView_byId(self, viewid):
embycursor = self.embycursor embycursor = self.embycursor
@ -53,7 +72,7 @@ class Embydb_Functions():
query = ' '.join(( query = ' '.join((
"SELECT view_id, view_name", "SELECT view_id, view_name, media_type",
"FROM view", "FROM view",
"WHERE media_type = ?" "WHERE media_type = ?"
)) ))
@ -63,7 +82,8 @@ class Embydb_Functions():
views.append({ views.append({
'id': row[0], 'id': row[0],
'name': row[1] 'name': row[1],
'itemtype': row[2]
}) })
return views return views

View file

@ -220,6 +220,9 @@ class LibrarySync(threading.Thread):
self.vnodes = videonodes.VideoNodes() self.vnodes = videonodes.VideoNodes()
self.syncThreadNumber = int(utils.settings('syncThreadNumber')) self.syncThreadNumber = int(utils.settings('syncThreadNumber'))
self.installSyncDone = True if \
utils.settings('SyncInstallRunDone') == 'true' else False
threading.Thread.__init__(self) threading.Thread.__init__(self)
def progressDialog(self, title, forced=False): def progressDialog(self, title, forced=False):
@ -234,22 +237,15 @@ class LibrarySync(threading.Thread):
return dialog return dialog
def startSync(self): def startSync(self):
# Run at start up - optional to use the server plugin # Only run fastSync AFTER startup when SyncInstallRunDone has already
if utils.settings('SyncInstallRunDone') == "true": # been set
utils.window('emby_dbScan', value="true")
# Validate views completed = self.fastSync()
self.maintainViews() if not completed:
completed = False # Fast sync failed or server plugin is not found
self.logMsg("Something went wrong, starting full sync", 0)
# completed = self.fastSync() completed = self.fullSync(manualrun=True)
utils.window('emby_dbScan', value="false")
if not completed:
# Fast sync failed or server plugin is not found
completed = self.fullSync(manualrun=True)
else:
# Install sync is not completed
completed = self.fullSync()
return completed return completed
def fastSync(self): def fastSync(self):
@ -259,6 +255,7 @@ class LibrarySync(threading.Thread):
Using /library/recentlyAdded is NOT working as changes to lib items are Using /library/recentlyAdded is NOT working as changes to lib items are
not reflected not reflected
""" """
self.compare = True
# Get last sync time # Get last sync time
lastSync = utils.window('LastIncrementalSync') lastSync = utils.window('LastIncrementalSync')
if not lastSync: if not lastSync:
@ -268,15 +265,10 @@ class LibrarySync(threading.Thread):
lastSync = '1420070400' lastSync = '1420070400'
self.logMsg("Last sync run: %s" % lastSync, 1) self.logMsg("Last sync run: %s" % lastSync, 1)
# Get all PMS views and PMS items already saved in Kodi # Get all PMS items already saved in Kodi
self.maintainViews()
embyconn = utils.kodiSQL('emby') embyconn = utils.kodiSQL('emby')
embycursor = embyconn.cursor() embycursor = embyconn.cursor()
emby_db = embydb.Embydb_Functions(embycursor) emby_db = embydb.Embydb_Functions(embycursor)
views = []
for itemtype in PlexFunctions.PlexLibraryItemtypes():
views.append(emby_db.getView_byType(itemtype))
self.logMsg("views is now: %s" % views, 2)
# Also get checksums of every Plex items already saved in Kodi # Also get checksums of every Plex items already saved in Kodi
allKodiElementsId = {} allKodiElementsId = {}
for itemtype in PlexFunctions.EmbyItemtypes(): for itemtype in PlexFunctions.EmbyItemtypes():
@ -284,47 +276,47 @@ class LibrarySync(threading.Thread):
allKodiElementsId.update(dict(emby_db.getChecksum(itemtype))) allKodiElementsId.update(dict(emby_db.getChecksum(itemtype)))
except ValueError: except ValueError:
pass pass
self.logMsg("allKodiElementsId is now: %s" % allKodiElementsId, 2) embyconn.close()
self.allKodiElementsId = allKodiElementsId
# Run through views and get latest changed elements using time diff # Run through views and get latest changed elements using time diff
for view in views: self.updatelist = []
self.allPlexElementsId = {}
updateKodiVideoLib = False
for view in self.views:
if self.threadStopped(): if self.threadStopped():
return False return True
# Get items per view # Get items per view
viewId = view['id'] items = PlexFunctions.GetPlexUpdatedItems(view['id'], lastSync)
viewName = view['name'] if not items:
all_plexmovies = PlexFunctions.GetPlexSectionResults(viewId) continue
# Populate self.updatelist and self.allPlexElementsId # Get one itemtype, because they're the same in the PMS section
self.GetUpdatelist(all_plexmovies, plexType = items[0]['type']
itemType, # Populate self.updatelist
'add_update', self.GetUpdatelist(items,
viewName, PlexFunctions.GetItemClassFromType(plexType),
viewId) PlexFunctions.GetItemClassFromType(plexType),
view['name'],
# Figure out whether an item needs updating view['id'])
# Process updates # Process self.updatelist
url = "{server}/library/recentlyAdded" if self.updatelist:
result = self.doUtils.downloadUrl(url, parameters=params) if self.updatelist[0]['itemType'] in ['Movies', 'TVShows']:
updateKodiVideoLib = True
try: self.GetAndProcessXMLs(plexType)
processlist = { self.updatelist = []
# Let Kodi grab the artwork now
'added': result['ItemsAdded'], if updateKodiVideoLib:
'update': result['ItemsUpdated'], xbmc.executebuiltin('UpdateLibrary(video)')
'userdata': result['UserDataChanged'], # Update userdata
'remove': result['ItemsRemoved'] for view in self.views:
} self.PlexUpdateWatched(
view['id'],
except (KeyError, TypeError): PlexFunctions.GetItemClassFromType(view['itemtype']))
self.logMsg("Failed to retrieve latest updates using fast sync.", 1) # Reset and return
return False self.saveLastSync()
self.allKodiElementsId = {}
else: self.allPlexElementsId = {}
self.logMsg("Fast sync changes: %s" % result, 1) return True
for action in processlist:
self.triage_items(action, processlist[action])
return True
def saveLastSync(self): def saveLastSync(self):
# Save last sync time # Save last sync time
@ -359,7 +351,6 @@ class LibrarySync(threading.Thread):
self.compare = manualrun self.compare = manualrun
music_enabled = utils.settings('enableMusic') == "true" music_enabled = utils.settings('enableMusic') == "true"
utils.window('emby_dbScan', value="true")
# Add sources # Add sources
utils.sourcesXML() utils.sourcesXML()
@ -395,9 +386,6 @@ class LibrarySync(threading.Thread):
startTime = datetime.now() startTime = datetime.now()
completed = process[itemtype]() completed = process[itemtype]()
if not completed: if not completed:
utils.window('emby_dbScan', clear=True)
return False return False
else: else:
elapsedTime = datetime.now() - startTime elapsedTime = datetime.now() - startTime
@ -429,13 +417,10 @@ class LibrarySync(threading.Thread):
# % (str(elapsedTime).split('.')[0]), 1) # % (str(elapsedTime).split('.')[0]), 1)
# musiccursor.close() # musiccursor.close()
utils.settings('SyncInstallRunDone', value="true")
utils.settings("dbCreatedWithVersion", self.clientInfo.getVersion())
self.saveLastSync()
xbmc.executebuiltin('UpdateLibrary(video)') xbmc.executebuiltin('UpdateLibrary(video)')
self.saveLastSync()
elapsedtotal = datetime.now() - starttotal elapsedtotal = datetime.now() - starttotal
utils.window('emby_dbScan', clear=True)
utils.window('emby_initialScan', clear=True) utils.window('emby_initialScan', clear=True)
xbmcgui.Dialog().notification( xbmcgui.Dialog().notification(
heading=self.addonName, heading=self.addonName,
@ -461,9 +446,7 @@ class LibrarySync(threading.Thread):
vnodes = self.vnodes vnodes = self.vnodes
# Get views # Get views
url = "{server}/library/sections" result = doUtils.downloadUrl("{server}/library/sections")['_children']
result = doUtils.downloadUrl(url)
result = result['_children']
# total nodes for window properties # total nodes for window properties
vnodes.clearProperties() vnodes.clearProperties()
@ -564,6 +547,10 @@ class LibrarySync(threading.Thread):
# totalnodes += 1 # totalnodes += 1
# Save total # Save total
utils.window('Emby.nodes.total', str(totalnodes)) utils.window('Emby.nodes.total', str(totalnodes))
# update views for all:
self.views = emby_db.getAllViewInfo()
self.logMsg("views saved: %s" % self.views, 1)
# commit changes to DB # commit changes to DB
embyconn.commit() embyconn.commit()
kodiconn.commit() kodiconn.commit()
@ -591,7 +578,8 @@ class LibrarySync(threading.Thread):
'itemType': 'Movies','TVShows', ... 'itemType': 'Movies','TVShows', ...
'method': 'add_update', 'add_updateSeason', ... 'method': 'add_update', 'add_updateSeason', ...
'viewName': xxx, 'viewName': xxx,
'viewId': xxx 'viewId': xxx,
'title': xxx
self.allPlexElementsId APPENDED(!!) dict self.allPlexElementsId APPENDED(!!) dict
= {itemid: checksum} = {itemid: checksum}
@ -651,14 +639,13 @@ class LibrarySync(threading.Thread):
""" """
# Some logging, just in case. # Some logging, just in case.
self.logMsg("self.updatelist: %s" % self.updatelist, 2) self.logMsg("self.updatelist: %s" % self.updatelist, 2)
if not len(self.updatelist) > 0: itemNumber = len(self.updatelist)
if itemNumber == 0:
return True return True
# Run through self.updatelist, get XML metadata per item # Run through self.updatelist, get XML metadata per item
# Initiate threads # Initiate threads
self.logMsg("=====================", 1)
self.logMsg("Starting sync threads", 1) self.logMsg("Starting sync threads", 1)
self.logMsg("=====================", 1)
getMetadataQueue = Queue.Queue() getMetadataQueue = Queue.Queue()
processMetadataQueue = Queue.Queue(maxsize=100) processMetadataQueue = Queue.Queue(maxsize=100)
getMetadataLock = threading.Lock() getMetadataLock = threading.Lock()
@ -675,14 +662,14 @@ class LibrarySync(threading.Thread):
getMetadataQueue.put(updateItem) getMetadataQueue.put(updateItem)
# Spawn GetMetadata threads for downloading # Spawn GetMetadata threads for downloading
threads = [] threads = []
for i in range(self.syncThreadNumber): for i in range(min(self.syncThreadNumber, itemNumber)):
thread = ThreadedGetMetadata(getMetadataQueue, thread = ThreadedGetMetadata(getMetadataQueue,
processMetadataQueue, processMetadataQueue,
getMetadataLock) getMetadataLock)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
threads.append(thread) threads.append(thread)
self.logMsg("%s download threads spawned" % self.syncThreadNumber, 1) self.logMsg("Download threads spawned", 1)
# Spawn one more thread to process Metadata, once downloaded # Spawn one more thread to process Metadata, once downloaded
thread = ThreadedProcessMetadata(processMetadataQueue, thread = ThreadedProcessMetadata(processMetadataQueue,
itemType, itemType,
@ -694,10 +681,9 @@ class LibrarySync(threading.Thread):
# Start one thread to show sync progress # Start one thread to show sync progress
dialog = xbmcgui.DialogProgressBG() dialog = xbmcgui.DialogProgressBG()
total = len(self.updatelist)
thread = ThreadedShowSyncInfo(dialog, thread = ThreadedShowSyncInfo(dialog,
[getMetadataLock, processMetadataLock], [getMetadataLock, processMetadataLock],
total, itemNumber,
itemType) itemType)
thread.setDaemon(True) thread.setDaemon(True)
thread.start() thread.start()
@ -723,9 +709,8 @@ class LibrarySync(threading.Thread):
# Make sure dialog window is closed # Make sure dialog window is closed
if dialog: if dialog:
dialog.close() dialog.close()
self.logMsg("=====================", 1)
self.logMsg("Sync threads finished", 1) self.logMsg("Sync threads finished", 1)
self.logMsg("=====================", 1) self.updatelist = []
return True return True
def PlexMovies(self): def PlexMovies(self):
@ -738,7 +723,7 @@ class LibrarySync(threading.Thread):
emby_db = embydb.Embydb_Functions(embycursor) emby_db = embydb.Embydb_Functions(embycursor)
itemType = 'Movies' itemType = 'Movies'
views = emby_db.getView_byType('movie') views = [x for x in self.views if x['itemtype'] == 'movie']
self.logMsg("Processing Plex %s. Libraries: %s" % (itemType, views), 1) self.logMsg("Processing Plex %s. Libraries: %s" % (itemType, views), 1)
if self.compare: if self.compare:
@ -861,7 +846,7 @@ class LibrarySync(threading.Thread):
embycursor = embyconn.cursor() embycursor = embyconn.cursor()
emby_db = embydb.Embydb_Functions(embycursor) emby_db = embydb.Embydb_Functions(embycursor)
views = emby_db.getView_byType('show') views = [x for x in self.views if x['itemtype'] == 'show']
self.logMsg("Media folders for %s: %s" % (itemType, views), 1) self.logMsg("Media folders for %s: %s" % (itemType, views), 1)
self.allKodiElementsId = {} self.allKodiElementsId = {}
@ -1009,106 +994,6 @@ class LibrarySync(threading.Thread):
return True return True
# Reserved for websocket_client.py and fast start
def triage_items(self, process, items):
processlist = {
'added': self.addedItems,
'update': self.updateItems,
'userdata': self.userdataItems,
'remove': self.removeItems
}
if items:
if process == "userdata":
itemids = []
for item in items:
itemids.append(item['ItemId'])
items = itemids
self.logMsg("Queue %s: %s" % (process, items), 1)
processlist[process].extend(items)
def incrementalSync(self):
embyconn = utils.kodiSQL('emby')
embycursor = embyconn.cursor()
kodiconn = utils.kodiSQL('video')
kodicursor = kodiconn.cursor()
emby = self.emby
emby_db = embydb.Embydb_Functions(embycursor)
pDialog = None
if self.refresh_views:
# Received userconfig update
self.refresh_views = False
self.maintainViews(embycursor, kodicursor)
self.forceLibraryUpdate = True
if self.addedItems or self.updateItems or self.userdataItems or self.removeItems:
# Only present dialog if we are going to process items
pDialog = self.progressDialog('Incremental sync')
process = {
'added': self.addedItems,
'update': self.updateItems,
'userdata': self.userdataItems,
'remove': self.removeItems
}
types = ['added', 'update', 'userdata', 'remove']
for type in types:
if process[type] and utils.window('emby_kodiScan') != "true":
listItems = list(process[type])
del process[type][:] # Reset class list
items_process = itemtypes.Items(embycursor, kodicursor)
update = False
# Prepare items according to process type
if type == "added":
items = emby.sortby_mediatype(listItems)
elif type in ("userdata", "remove"):
items = emby_db.sortby_mediaType(listItems, unsorted=False)
else:
items = emby_db.sortby_mediaType(listItems)
if items.get('Unsorted'):
sorted_items = emby.sortby_mediatype(items['Unsorted'])
doupdate = items_process.itemsbyId(sorted_items, "added", pDialog)
if doupdate:
update = True
del items['Unsorted']
doupdate = items_process.itemsbyId(items, type, pDialog)
if doupdate:
update = True
if update:
self.forceLibraryUpdate = True
if self.forceLibraryUpdate:
# Force update the Kodi library
self.forceLibraryUpdate = False
self.dbCommit(kodiconn)
embyconn.commit()
self.saveLastSync()
self.logMsg("Updating video library.", 1)
utils.window('emby_kodiScan', value="true")
xbmc.executebuiltin('UpdateLibrary(video)')
if pDialog:
pDialog.close()
kodicursor.close()
embycursor.close()
def compareDBVersion(self, current, minimum): def compareDBVersion(self, current, minimum):
# It returns True is database is up to date. False otherwise. # It returns True is database is up to date. False otherwise.
self.logMsg("current: %s minimum: %s" % (current, minimum), 1) self.logMsg("current: %s minimum: %s" % (current, minimum), 1)
@ -1140,6 +1025,8 @@ class LibrarySync(threading.Thread):
def run_internal(self): def run_internal(self):
startupComplete = False startupComplete = False
self.views = []
count = 0
self.logMsg("---===### Starting LibrarySync ###===---", 0) self.logMsg("---===### Starting LibrarySync ###===---", 0)
while not self.threadStopped(): while not self.threadStopped():
@ -1149,12 +1036,11 @@ class LibrarySync(threading.Thread):
# Set in service.py # Set in service.py
if self.threadStopped(): if self.threadStopped():
# Abort was requested while waiting. We should exit # Abort was requested while waiting. We should exit
break return
xbmc.sleep(3000) xbmc.sleep(3000)
if (utils.window('emby_dbCheck') != "true" and if (utils.window('emby_dbCheck') != "true" and
utils.settings('SyncInstallRunDone') == "true"): self.installSyncDone):
# Verify the validity of the database # Verify the validity of the database
currentVersion = utils.settings('dbCreatedWithVersion') currentVersion = utils.settings('dbCreatedWithVersion')
minVersion = utils.window('emby_minDBVersion') minVersion = utils.window('emby_minDBVersion')
@ -1181,8 +1067,8 @@ class LibrarySync(threading.Thread):
utils.window('emby_dbCheck', value="true") utils.window('emby_dbCheck', value="true")
if not startupComplete: if not startupComplete:
# Also runs when installed first
# Verify the video database can be found # Verify the video database can be found
videoDb = utils.getKodiVideoDBPath() videoDb = utils.getKodiVideoDBPath()
if not xbmcvfs.exists(videoDb): if not xbmcvfs.exists(videoDb):
@ -1202,10 +1088,11 @@ class LibrarySync(threading.Thread):
break break
# Run start up sync # Run start up sync
utils.window('emby_dbScan', value="true")
self.logMsg("Db version: %s" % utils.settings('dbCreatedWithVersion'), 0) self.logMsg("Db version: %s" % utils.settings('dbCreatedWithVersion'), 0)
self.logMsg("SyncDatabase (started)", 1) self.logMsg("SyncDatabase (started)", 1)
startTime = datetime.now() startTime = datetime.now()
librarySync = self.startSync() librarySync = self.fullSync(manualrun=True)
elapsedTime = datetime.now() - startTime elapsedTime = datetime.now() - startTime
self.logMsg( self.logMsg(
"SyncDatabase (finished in: %s) %s" "SyncDatabase (finished in: %s) %s"
@ -1213,27 +1100,43 @@ class LibrarySync(threading.Thread):
# Only try the initial sync once per kodi session regardless # Only try the initial sync once per kodi session regardless
# This will prevent an infinite loop in case something goes wrong. # This will prevent an infinite loop in case something goes wrong.
startupComplete = True startupComplete = True
utils.settings(
'SyncInstallRunDone', value="true")
utils.settings(
"dbCreatedWithVersion", self.clientInfo.getVersion())
self.installSyncDone = True
utils.window('emby_dbScan', value="false")
if utils.window('emby_dbScan') != "true": # Currently no db scan, so we can start a new scan
# Currently no db scan, so we can start a new scan elif utils.window('emby_dbScan') != "true":
# Full scan was requested from somewhere else, e.g. userclient
if utils.window('plex_runLibScan') == "true": if utils.window('plex_runLibScan') == "true":
self.logMsg('Full library scan requested, starting', 1) self.logMsg('Full library scan requested, starting', 1)
utils.window('emby_dbScan', value="true")
utils.window('plex_runLibScan', value='false') utils.window('plex_runLibScan', value='false')
self.fullSync(manualrun=True) self.fullSync(manualrun=True)
utils.window('emby_dbScan', value="false")
count = 0
else: else:
self.incrementalSync() # Run full lib scan approx every 10min
if count % 600 == 0:
if (utils.window('emby_onWake') == "true" and self.logMsg('Running maintainViews() scan', 1)
utils.window('emby_online') == "true"): utils.window('emby_dbScan', value="true")
# Kodi is waking up self.fullSync(manualrun=True)
# Set in kodimonitor.py utils.window('emby_dbScan', value="false")
utils.window('emby_onWake', clear=True) count = 0
if utils.window('emby_syncRunning') != "true": # Update views / PMS libraries approx. every 2 minutes
self.logMsg("SyncDatabase onWake (started)", 0) elif count % 120 == 0:
librarySync = self.startSync() self.logMsg('Running maintainViews() scan', 1)
self.logMsg("SyncDatabase onWake (finished) %s" % librarySync, 0) utils.window('emby_dbScan', value="true")
self.maintainViews()
self.startSync()
# Run fast sync approx every 10s
elif count % 10 == 0:
self.startSync()
xbmc.sleep(1000) xbmc.sleep(1000)
count += 1
self.logMsg("###===--- LibrarySync Stopped ---===###", 0) self.logMsg("###===--- LibrarySync Stopped ---===###", 0)