Cleanup background sync

This commit is contained in:
tomkat83 2016-03-27 19:06:36 +02:00
parent 1e49e9dea9
commit fb1bc7c555
6 changed files with 169 additions and 409 deletions

View File

@ -395,6 +395,7 @@
<string id="39050">[COLOR yellow]Choose Plex Server from a list[/COLOR]</string>
<string id="39051">Wait before sync new/changed PMS item [s]</string>
<string id="39052">Background Sync</string>
<string id="39053">Do a full library sync every x minutes</string>

View File

@ -333,6 +333,8 @@
<string id="39050">[COLOR yellow]Plex Server aus Liste auswählen[/COLOR]</string>
<string id="39051">Warten bevor neue/geänderte PMS Einträge gesynct werden [s]</string>
<string id="39052">Hintergrund-Synchronisation</string>
<string id="39053">Kompletten Scan aller Bibliotheken alle x Minuten durchführen</string>
<!-- Plex Entrypoint.py -->
<string id="39200">Plex Home Benutzer abmelden: </string>

View File

@ -2132,8 +2132,8 @@ class Music(Items):
# No album found. Let's create it
self.logMsg("Album database entry missing.", 1)
emby_albumId = item.attrib.get('parentRatingKey')
album = emby.getItem(emby_albumId)
self.add_updateAlbum(album)
album = GetPlexMetadata(emby_albumId)
self.add_updateAlbum(album[0])
emby_dbalbum = emby_db.getItem_byId(emby_albumId)
try:
albumid = emby_dbalbum[0]

View File

@ -4,7 +4,6 @@
from threading import Thread, Lock
import Queue
import xml.etree.ElementTree as etree
import xbmc
import xbmcgui
@ -220,8 +219,6 @@ class LibrarySync(Thread):
"""
# Borg, even though it's planned to only have 1 instance up and running!
_shared_state = {}
# How long should we look into the past for fast syncing items (in s)
syncPast = 30
def __init__(self, queue):
self.__dict__ = self._shared_state
@ -235,6 +232,8 @@ class LibrarySync(Thread):
# How long should we wait at least to process new/changed PMS items?
self.saftyMargin = int(utils.settings('saftyMargin'))
self.fullSyncInterval = int(utils.settings('fullSyncInterval')) * 60
self.clientInfo = clientinfo.ClientInfo()
self.user = userclient.UserClient()
self.emby = embyserver.Read_EmbyServer()
@ -256,9 +255,7 @@ class LibrarySync(Thread):
# Time offset between Kodi and PMS in seconds (=Koditime - PMStime)
self.timeoffset = 0
# Time in seconds to look into the past when looking for PMS changes
# (safety margin - the larger, the more items we need to process)
self.syncPast = 30
self.lastSync = 0
Thread.__init__(self)
@ -363,7 +360,7 @@ class LibrarySync(Thread):
PF.scrobble(plexId, 'watched')
# Let the PMS process this first!
xbmc.sleep(2000)
# Get all PMS items to find the item we changed
# Get all PMS items to find the item we just changed
items = PF.GetAllPlexLeaves(libraryId,
lastViewedAt=timestamp,
containerSize=self.limitindex)
@ -380,246 +377,11 @@ class LibrarySync(Thread):
return
# Calculate time offset Kodi-PMS
self.timeoffset = int(koditime) - int(plextime)
utils.window('kodiplextimeoffset', value=str(self.timeoffset))
timeoffset = int(koditime) - int(plextime)
utils.window('kodiplextimeoffset', value=str(timeoffset))
self.logMsg("Time offset Koditime - Plextime in seconds: %s"
% str(self.timeoffset), 0)
def getPMSfromKodiTime(self, koditime):
"""
Uses self.timeoffset to return the PMS time for a given Kodi timestamp
(in unix time)
Feed with integers
"""
return koditime - self.timeoffset
def resetProcessedItems(self):
"""
Resets the list of PMS items that we have already processed
"""
self.processed = {
'movie': {},
'show': {},
'season': {},
'episode': {},
'artist': {},
'album': {},
'track': {}
}
def getFastUpdateList(self, xml, plexType, viewName, viewId):
"""
THIS METHOD NEEDS TO BE FAST! => e.g. no API calls
Adds items to self.updatelist as well as self.allPlexElementsId dict
Input:
xml: PMS answer for section items
plexType: 'movie', 'show', 'episode', ...
viewName: Name of the Plex view (e.g. 'My TV shows')
viewId: Id/Key of Plex library (e.g. '1')
Output: self.updatelist, self.allPlexElementsId
self.updatelist APPENDED(!!) list itemids (Plex Keys as
as received from API.getRatingKey())
One item in this list is of the form:
'itemId': xxx,
'itemType': 'Movies','TVShows', ...
'method': 'add_update', 'add_updateSeason', ...
'viewName': xxx,
'viewId': xxx,
'title': xxx
self.allPlexElementsId APPENDED(!!) dict
= {itemid: checksum}
"""
# Needs to call other methods than if we're only updating userdata
for item in xml:
itemId = item.attrib.get('ratingKey')
# Skipping items 'title=All episodes' without a 'ratingKey'
if not itemId:
continue
lastViewedAt = item.attrib.get('lastViewedAt')
updatedAt = item.attrib.get('updatedAt')
# returns the tuple (lastViewedAt, updatedAt) for the
# specific item
res = self.processed[plexType].get(itemId)
if res:
# Only look at the updatedAt flag!
# tuple: (lastViewedAt, updatedAt)
if res[1] == updatedAt:
# Nothing to update, we have already processed this
# item
continue
title = item.attrib.get('title', 'Missing Title Name')
# We need to process this:
self.updatelist.append({
'itemId': itemId,
'itemType': PF.GetItemClassFromType(
plexType),
'method': PF.GetMethodFromPlexType(plexType),
'viewName': viewName,
'viewId': viewId,
'title': title
})
# And safe to self.processed:
self.processed[plexType][itemId] = (lastViewedAt, updatedAt)
# Quickly log
if self.updatelist:
self.logMsg('fastSync updatelist: %s' % self.updatelist, 1)
self.logMsg('fastSync processed list: %s' % self.processed, 1)
def fastSync(self):
"""
Fast incremential lib sync
Using /library/recentlyAdded is NOT working as changes to lib items are
not reflected
This will NOT remove items from Kodi db that were removed from the PMS
(happens only during fullsync)
Items that are processed are appended to the dict self.processed:
{
'<Plex itemtype>': e.g. 'movie'
{
'<ratingKey>': ( unique plex id 'ratingKey' as str
lastViewedAt,
updatedAt
)
}
}
"""
# Get last sync time and look a bit in the past (safety margin)
lastSync = self.lastSync - self.syncPast
# Set new timestamp NOW because sync might take a while
self.saveLastSync()
# Original idea: Get all PMS items already saved in Kodi
# Also get checksums of every Plex items already saved in Kodi
self.allKodiElementsId = {}
# Run through views and get latest changed elements using time diff
self.updateKodiVideoLib = False
self.updateKodiMusicLib = False
for view in self.views:
self.updatelist = []
# Get items per view
items = PF.GetAllPlexLeaves(
view['id'],
updatedAt=self.getPMSfromKodiTime(lastSync),
containerSize=self.limitindex)
# Just skip if something went wrong
if items is None:
continue
# Get one itemtype, because they're the same in the PMS section
try:
plexType = items[0].attrib['type']
except:
# There was no child - PMS response is empty
continue
# Populate self.updatelist
self.getFastUpdateList(
items, plexType, view['name'], view['id'])
# Process self.updatelist
if self.updatelist:
if self.updatelist[0]['itemType'] in ['Movies', 'TVShows']:
self.updateKodiVideoLib = True
elif self.updatelist[0]['itemType'] == 'Music':
self.updateKodiMusicLib = True
# Do the work
self.GetAndProcessXMLs(
PF.GetItemClassFromType(plexType),
showProgress=False)
self.updatelist = []
# Update userdata DIRECTLY
# We don't need to refresh the Kodi library for deltas!!
# Start with an empty ElementTree and attach items to update
movieupdate = False
episodeupdate = False
songupdate = False
for view in self.views:
items = PF.GetAllPlexLeaves(
view['id'],
lastViewedAt=self.getPMSfromKodiTime(lastSync),
containerSize=self.limitindex)
if items is None:
continue
for item in items:
itemId = item.attrib.get('ratingKey')
# Skipping items 'title=All episodes' without a 'ratingKey'
if not itemId:
continue
plexType = item.attrib['type']
lastViewedAt = item.attrib.get('lastViewedAt')
updatedAt = item.attrib.get('updatedAt')
# returns the tuple (lastViewedAt, updatedAt) for the
# specific item
res = self.processed[plexType].get(itemId)
if res:
# Only look at lastViewedAt
if res[0] == lastViewedAt:
# Nothing to update, we have already processed this
# item
continue
if plexType == 'movie':
movieupdate = True
try:
movieXML.append(item)
except:
movieXML = etree.Element('root')
movieXML.append(item)
elif plexType == 'episode':
episodeupdate = True
try:
episodeXML.append(item)
except:
episodeXML = etree.Element('root')
episodeXML.append(item)
elif plexType == 'track':
songupdate = True
try:
musicXML.append(item)
except:
musicXML = etree.Element('root')
musicXML.append(item)
# And safe to self.processed:
self.processed[plexType][itemId] = (lastViewedAt, updatedAt)
if movieupdate:
with itemtypes.Movies() as movies:
movies.updateUserdata(movieXML)
if episodeupdate:
with itemtypes.TVShows() as tvshows:
tvshows.updateUserdata(episodeXML)
if songupdate:
with itemtypes.Music() as music:
music.updateUserdata(musicXML)
# Let Kodi update the library now (artwork and userdata)
if self.updateKodiVideoLib:
self.logMsg("Doing Kodi Video Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(video)')
if self.updateKodiMusicLib:
self.logMsg("Doing Kodi Music Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(music)')
# Show warning if itemtypes.py crashed at some point
if utils.window('plex_scancrashed') == 'true':
xbmcgui.Dialog().ok(self.addonName, self.__language__(39408))
utils.window('plex_scancrashed', clear=True)
return True
def saveLastSync(self):
# Save last sync time
self.lastSync = utils.getUnixTimestamp()
def initializeDBs(self):
"""
Run once during startup to verify that emby db exists.
@ -655,15 +417,15 @@ class LibrarySync(Thread):
# Add sources
utils.sourcesXML()
# Set new timestamp NOW because sync might take a while
self.saveLastSync()
# Deactivate Kodi popup showing that it's (unsuccessfully) trying to
# scan music folders
if self.enableMusic:
utils.musiclibXML()
utils.advancedSettingsXML()
# Set new timestamp NOW because sync might take a while
self.saveLastSync()
# Ensure that DBs exist if called for very first time
self.initializeDBs()
# Set views. Abort if unsuccessful
@ -1414,11 +1176,17 @@ class LibrarySync(Thread):
if currMajor > minMajor:
return True
elif (currMajor == minMajor and (currMinor > minMinor or
(currMinor == minMinor and currPatch >= minPatch))):
elif currMajor < minMajor:
return False
if currMinor > minMinor:
return True
elif currMinor < minMinor:
return False
if currPatch >= minPatch:
return True
else:
# Database out of date.
return False
def processMessage(self, message):
@ -1427,9 +1195,6 @@ class LibrarySync(Thread):
do with "process_" methods
"""
typus = message.get('type')
if typus is None:
self.logMsg('No type, dropping message: %s' % message, -1)
return
self.logMsg('Message received from websocket: %s' % message, 2)
if typus == 'playing':
self.process_playing(message['_children'])
@ -1446,102 +1211,128 @@ class LibrarySync(Thread):
del liste[index]
return liste
def process_newitems(self):
def processItems(self):
"""
Periodically called to process new/updated PMS items
PMS needs a while to download info from internet AFTER it
showed up under 'timeline' websocket messages
"""
videoLibUpdate = False
now = utils.getUnixTimestamp()
deleteListe = []
for i, item in enumerate(self.itemsToProcess):
ratingKey = item['ratingKey']
timestamp = item['timestamp']
if now - timestamp < self.saftyMargin:
# We haven't waited long enough for the PMS to finish
# processing the item
continue
xml = PF.GetPlexMetadata(ratingKey)
if xml is None:
self.logMsg('Could not download metadata for %s'
% ratingKey, -1)
continue
deleteListe.append(i)
self.logMsg("Adding new PMS item: %s" % ratingKey, 1)
viewtag = xml.attrib.get('librarySectionTitle')
viewid = xml.attrib.get('librarySectionID')
mediatype = xml[0].attrib.get('type')
if mediatype == 'movie':
# Movie
videoLibUpdate = True
with itemtypes.Movies() as movie:
movie.add_update(xml[0],
viewtag=viewtag,
viewid=viewid)
elif mediatype == 'episode':
# Episode
videoLibUpdate = True
with itemtypes.TVShows() as show:
show.add_updateEpisode(xml[0],
viewtag=viewtag,
viewid=viewid)
# Get rid of the items we just processed
if len(deleteListe) > 0:
self.itemsToProcess = self.multi_delete(
self.itemsToProcess, deleteListe)
# Let Kodi know of the change
if videoLibUpdate is True:
self.logMsg("Doing Kodi Video Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(video)')
def process_timeline(self, data):
"""
PMS is messing with the library items
data['type']:
1: movie
2: tv show??
3: season??
4: episode
8: artist (band)
9: album
10: track (song)
12: trailer, extras?
"""
videoLibUpdate = False
for item in data:
if item.get('state') == 9:
# Item was deleted.
# Only care for playable type
# For some reason itemID and not ratingKey
if item.get('type') == 1:
# Movie
self.logMsg("Removing movie %s" % item.get('itemID'), 1)
videoLibUpdate = True
with itemtypes.Movies() as movie:
movie.remove(item.get('itemID'))
elif item.get('type') == 4:
# Episode
self.logMsg("Removing episode %s" % item.get('itemID'), 1)
videoLibUpdate = True
with itemtypes.TVShows() as show:
show.remove(item.get('itemID'))
elif item.get('state') == 5 and item.get('type') in (1, 4):
# Item added or changed
# Need to process later because PMS needs to be done first
self.logMsg('New/changed PMS item detected: %s'
% item.get('itemID'), 1)
data['state']:
0: 'created',
2: 'matching',
3: 'downloading',
4: 'loading',
5: 'finished',
6: 'analyzing',
9: 'deleted'
"""
self.videoLibUpdate = False
self.musicLibUpdate = False
now = utils.getUnixTimestamp()
deleteListe = []
for i, item in enumerate(self.itemsToProcess):
if now - item['timestamp'] < self.saftyMargin:
# We haven't waited long enough for the PMS to finish
# processing the item. Do it later
continue
if item['state'] == 5:
if self.process_newitems(item) is True:
deleteListe.append(i)
elif item['state'] == 9:
if self.process_deleteditems(item) is True:
deleteListe.append(i)
# Get rid of the items we just processed
if len(deleteListe) > 0:
self.itemsToProcess = self.multi_delete(
self.itemsToProcess, deleteListe)
# Let Kodi know of the change
if self.videoLibUpdate is True:
self.logMsg("Doing Kodi Video Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(video)')
if self.musicLibUpdate is True:
self.logMsg("Doing Kodi Music Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(music)')
def process_newitems(self, item):
ratingKey = item['ratingKey']
xml = PF.GetPlexMetadata(ratingKey)
if xml is None:
self.logMsg('Could not download metadata for %s, skipping'
% ratingKey, -1)
return False
self.logMsg("Processing new/updated PMS item: %s" % ratingKey, 1)
viewtag = xml.attrib.get('librarySectionTitle')
viewid = xml.attrib.get('librarySectionID')
mediatype = xml[0].attrib.get('type')
if mediatype == 'movie':
self.videoLibUpdate = True
with itemtypes.Movies() as movie:
movie.add_update(xml[0],
viewtag=viewtag,
viewid=viewid)
elif mediatype == 'episode':
self.videoLibUpdate = True
with itemtypes.TVShows() as show:
show.add_updateEpisode(xml[0],
viewtag=viewtag,
viewid=viewid)
elif mediatype == 'track':
self.musicLibUpdate = True
with itemtypes.Music() as music:
music.add_updateSong(xml[0],
viewtag=viewtag,
viewid=viewid)
return True
def process_deleteditems(self, item):
if item.get('type') == 1:
# Movie
self.logMsg("Removing movie %s" % item.get('ratingKey'), 1)
self.videoLibUpdate = True
with itemtypes.Movies() as movie:
movie.remove(item.get('ratingKey'))
elif item.get('type') in (2, 3, 4):
self.logMsg("Removing episode/season/tv show %s"
% item.get('ratingKey'), 1)
self.videoLibUpdate = True
with itemtypes.TVShows() as show:
show.remove(item.get('ratingKey'))
elif item.get('type') in (8, 9, 10):
self.logMsg("Removing song/album/artist %s"
% item.get('ratingKey'), 1)
self.musicLibUpdate = True
with itemtypes.Music() as music:
music.remove(item.get('ratingKey'))
return True
def process_timeline(self, data):
"""
PMS is messing with the library items, e.g. new or changed. Put in our
"processing queue"
"""
for item in data:
state = item.get('state')
typus = item.get('type')
if state == 9 or (state == 5 and typus in (1, 4, 10)):
self.itemsToProcess.append({
'state': state,
'type': typus,
'ratingKey': item.get('itemID'),
'timestamp': utils.getUnixTimestamp()
})
# Let Kodi know of the change
if videoLibUpdate is True:
self.logMsg("Doing Kodi Video Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(video)')
def process_playing(self, data):
"""
Someone (not necessarily the user signed in) is playing something some-
@ -1625,6 +1416,12 @@ class LibrarySync(Thread):
with itemFkt() as Fkt:
Fkt.updatePlaystate(item)
def saveLastSync(self):
"""
Save last full sync time
"""
self.lastSync = utils.getUnixTimestamp()
def run(self):
try:
self.run_internal()
@ -1650,6 +1447,8 @@ class LibrarySync(Thread):
fullSync = self.fullSync
processMessage = self.processMessage
string = self.__language__
fullSyncInterval = self.fullSyncInterval
lastProcessing = 0
dialog = xbmcgui.Dialog()
@ -1657,12 +1456,8 @@ class LibrarySync(Thread):
startupComplete = False
self.views = []
count = 0
errorcount = 0
# Initialize self.processed
self.resetProcessedItems()
log("---===### Starting LibrarySync ###===---", 0)
while not threadStopped():
@ -1747,7 +1542,6 @@ class LibrarySync(Thread):
window('plex_runLibScan', clear=True)
fullSync(manualrun=True)
window('emby_dbScan', clear=True)
count = 0
# Full library sync finished
self.showKodiNote(string(39407), forced=True)
# Reset views was requested from somewhere else
@ -1774,12 +1568,10 @@ class LibrarySync(Thread):
forced=True,
icon="error")
window('emby_dbScan', clear=True)
elif enableBackgroundSync:
# Run full lib scan approx every 30min
if count >= 1800:
count = 0
# Also reset self.processed, just in case
self.resetProcessedItems()
else:
now = utils.getUnixTimestamp()
if now - self.lastSync > fullSyncInterval:
log('Doing scheduled full library scan', 1)
# Recalculate time offset Kodi - PMS
self.syncPMStime()
window('emby_dbScan', value="true")
@ -1788,13 +1580,13 @@ class LibrarySync(Thread):
window('emby_dbScan', clear=True)
# Full library sync finished
self.showKodiNote(string(39407), forced=False)
elif count % 300 == 0:
count += 1
window('emby_dbScan', value="true")
self.process_newitems()
window('emby_dbScan', clear=True)
else:
count += 1
elif enableBackgroundSync:
# Check back whether we should process something
if now - lastProcessing > 5:
window('emby_dbScan', value="true")
self.processItems()
window('emby_dbScan', clear=True)
lastProcessing = now
# See if there is a PMS message we need to handle
try:
message = queue.get(block=False)
@ -1812,6 +1604,5 @@ class LibrarySync(Thread):
continue
xbmc.sleep(100)
count += 1
log("###===--- LibrarySync Stopped ---===###", 0)

View File

@ -48,17 +48,6 @@ class WebSocket_Client(threading.Thread):
self.clientInfo = clientinfo.ClientInfo()
self.deviceId = self.clientInfo.getDeviceId()
# 'state' that can be returned by PMS
self.timeStates = {
0: 'created',
2: 'matching',
3: 'downloading',
4: 'loading',
5: 'finished',
6: 'analyzing',
9: 'deleted'
}
threading.Thread.__init__(self)
def sendProgressUpdate(self, data):
@ -88,6 +77,14 @@ class WebSocket_Client(threading.Thread):
self.logMsg('Error decoding message from websocket: %s' % e, -1)
return False
# Triage
typus = message.get('type')
if typus is None:
self.logMsg('No message type, dropping message: %s' % message, -1)
return False
# Drop everything we're not interested in
if typus not in ('playing', 'timeline'):
return
# Put PMS message on queue and let libsync take care of it
try:
self.queue.put(message)
@ -97,50 +94,19 @@ class WebSocket_Client(threading.Thread):
self.logMsg('Queue is full, dropping PMS message', 0)
return False
def processing_playing(self, message):
"""
Called when somewhere a PMS item is started, being played, stopped.
def on_message_LEGACY_EMBY(self, ws, message):
log = self.logMsg
window = utils.window
lang = utils.language
Calls Libsync with a list of children dictionaries:
{
'_elementType': e.g. 'PlaySessionStateNotification'
'guid': e.g. ''
'key': e.g. '/library/metadata/282300',
'ratingKey': e.g. '282300',
'sessionKey': e.g. '590',
'state': e.g. 'playing', 'available', 'buffering',
'stopped'
'transcodeSession': e.g. 'yv50n9p4cr',
'url': e.g. ''
'viewOffset': e.g. 1878534 (INT!)
}
"""
children = message.get('_children')
if not children:
return False
def processing_progress(self, message):
"""
Called when a PMS items keeps getting played (resume points update)
"""
def processing_timeline(self, message):
"""
Called when a PMS is in the process or has updated/added/removed a
library item
"""
children = message.get('_children')
if not children:
return False
for item in children:
state = self.timeStates.get(item.get('state'))
return True
def processing_status(self, message):
"""
Called when a PMS is scanning its libraries (to be verified)
"""
result = json.loads(message)
messageType = result['MessageType']
data = result['Data']
if messageType not in ('SessionEnded'):
# Mute certain events
log("Message: %s" % message, 1)
if messageType == "Play":
# A remote control play command has been sent from the server.
@ -316,7 +282,7 @@ class WebSocket_Client(threading.Thread):
elif messageType == "ServerRestarting":
if utils.settings('supressRestartMsg') == "true":
xbmcgui.Dialog().notification(
heading=self.addonName,
heading="Emby for Kodi",
message=lang(33006),
icon="special://home/addons/plugin.video.emby/icon.png")
@ -330,6 +296,7 @@ class WebSocket_Client(threading.Thread):
def on_open(self, ws):
return
# Can we post something to Plex?
self.doUtils.postCapabilities(self.deviceId)
def on_error(self, ws, error):
@ -346,11 +313,9 @@ class WebSocket_Client(threading.Thread):
# websocket.enableTrace(True)
userId = window('currUserId')
server = window('pms_server')
# Need to use plex.tv token, if any
# Need to use plex.tv token, if any. NOT user token
token = window('plex_token')
deviceId = self.deviceId
# Get the appropriate prefix for the websocket
if "https" in server:

View File

@ -60,6 +60,7 @@
<setting type="lsep" label="39052" /><!-- Background Sync -->
<setting id="enableBackgroundSync" type="bool" label="39026" default="true" visible="true"/>
<setting id="saftyMargin" type="slider" label="39051" default="30" option="int" range="10,1,300" visible="eq(-1,true)"/>
<setting id="fullSyncInterval" type="number" label="39053" default="30" option="int" />
<setting type="lsep" label="30538" /><!-- Complete Re-Sync necessary -->
<setting id="enableMusic" type="bool" label="30509" default="true" />