From 5949988b68da1683dad99203d5b9d195ceea479f Mon Sep 17 00:00:00 2001 From: tomkat83 Date: Thu, 24 Mar 2016 18:52:02 +0100 Subject: [PATCH] Plex websockets - groundworks 2 --- resources/lib/PlexFunctions.py | 10 + resources/lib/itemtypes.py | 11 ++ resources/lib/kodidb_functions.py | 45 ++++- resources/lib/librarysync.py | 244 ++++++++++++++++++++---- resources/lib/plexbmchelper/settings.py | 7 +- resources/lib/websocket.py | 12 +- resources/lib/websocket_client.py | 35 ++-- service.py | 17 +- 8 files changed, 312 insertions(+), 69 deletions(-) diff --git a/resources/lib/PlexFunctions.py b/resources/lib/PlexFunctions.py index ab50e509..4aa82ee1 100644 --- a/resources/lib/PlexFunctions.py +++ b/resources/lib/PlexFunctions.py @@ -31,6 +31,8 @@ def ConvertPlexToKodiTime(plexTime): """ Converts Plextime to Koditime. Returns an int (in seconds). """ + if plexTime is None: + return None return int(float(plexTime) * PlexToKodiTimefactor()) @@ -47,6 +49,14 @@ def GetItemClassFromType(itemType): return classes[itemType] +def GetItemClassFromNumber(itemType): + classes = { + 1: 'Movies', + 4: 'TVShows', + } + return classes[itemType] + + def GetKodiTypeFromPlex(plexItemType): """ As used in playlist.item here: http://kodi.wiki/view/JSON-RPC_API diff --git a/resources/lib/itemtypes.py b/resources/lib/itemtypes.py index c31720e7..d400b707 100644 --- a/resources/lib/itemtypes.py +++ b/resources/lib/itemtypes.py @@ -257,6 +257,17 @@ class Items(object): userdata['PlayCount'], userdata['LastPlayedDate']) + def updatePlaystate(self, item): + if item['duration'] is None: + item['duration'] = self.kodi_db.getVideoRuntime(item['kodi_id'], + item['kodi_type']) + self.logMsg('Updating item with: %s' % item, 0) + self.kodi_db.addPlaystate(item['file_id'], + item['viewOffset'], + item['duration'], + item['viewCount'], + item['lastViewedAt']) + class Movies(Items): diff --git a/resources/lib/kodidb_functions.py b/resources/lib/kodidb_functions.py index 6bb37d8a..da9551e3 100644 --- a/resources/lib/kodidb_functions.py +++ b/resources/lib/kodidb_functions.py @@ -857,10 +857,28 @@ class Kodidb_Functions(): ids.append(row[0]) return ids - def addPlaystate(self, fileid, resume_seconds, total_seconds, playcount, dateplayed): - - cursor = self.cursor + def getVideoRuntime(self, kodiid, mediatype): + if mediatype == 'movie': + query = ' '.join(( + "SELECT c11", + "FROM movie", + "WHERE idMovie = ?", + )) + elif mediatype == 'episode': + query = ' '.join(( + "SELECT c09", + "FROM episode", + "WHERE idEpisode = ?", + )) + self.cursor.execute(query, (kodiid,)) + try: + runtime = self.cursor.fetchone()[0] + except TypeError: + return None + return int(runtime) + def addPlaystate(self, fileid, resume_seconds, total_seconds, playcount, dateplayed): + cursor = self.cursor # Delete existing resume point query = ' '.join(( @@ -870,13 +888,20 @@ class Kodidb_Functions(): cursor.execute(query, (fileid,)) # Set watched count - query = ' '.join(( - - "UPDATE files", - "SET playCount = ?, lastPlayed = ?", - "WHERE idFile = ?" - )) - cursor.execute(query, (playcount, dateplayed, fileid)) + if playcount is None: + query = ' '.join(( + "UPDATE files", + "SET lastPlayed = ?", + "WHERE idFile = ?" + )) + cursor.execute(query, (dateplayed, fileid)) + else: + query = ' '.join(( + "UPDATE files", + "SET playCount = ?, lastPlayed = ?", + "WHERE idFile = ?" + )) + cursor.execute(query, (playcount, dateplayed, fileid)) # Set the resume bookmark if resume_seconds: diff --git a/resources/lib/librarysync.py b/resources/lib/librarysync.py index f097d1df..b7aa524d 100644 --- a/resources/lib/librarysync.py +++ b/resources/lib/librarysync.py @@ -21,7 +21,7 @@ import read_embyserver as embyserver import userclient import videonodes -import PlexFunctions +import PlexFunctions as PF ############################################################################### @@ -66,7 +66,7 @@ class ThreadedGetMetadata(Thread): xbmc.sleep(100) continue # Download Metadata - plexXML = PlexFunctions.GetPlexMetadata(updateItem['itemId']) + plexXML = PF.GetPlexMetadata(updateItem['itemId']) if plexXML is None: # Did not receive a valid XML - skip that item for now self.logMsg("Could not get metadata for %s. " @@ -209,17 +209,27 @@ class ThreadedShowSyncInfo(Thread): @utils.ThreadMethodsAdditionalStop('emby_shouldStop') @utils.ThreadMethods class LibrarySync(Thread): + """ + librarysync.LibrarySync(queue) + + where (communication with websockets) + queue: Queue object for background sync + """ # Borg, even though it's planned to only have 1 instance up and running! _shared_state = {} # How long should we look into the past for fast syncing items (in s) syncPast = 30 - def __init__(self): - + def __init__(self, queue): self.__dict__ = self._shared_state self.__language__ = xbmcaddon.Addon().getLocalizedString + # Communication with websockets + self.queue = queue + self.itemsToProcess = [] + self.safteyMargin = 30 + self.clientInfo = clientinfo.ClientInfo() self.user = userclient.UserClient() self.emby = embyserver.Read_EmbyServer() @@ -325,7 +335,7 @@ class LibrarySync(Thread): return # Get the Plex item's metadata - xml = PlexFunctions.GetPlexMetadata(plexId) + xml = PF.GetPlexMetadata(plexId) if xml is None: self.logMsg("Could not download metadata, aborting time sync", -1) return @@ -341,15 +351,15 @@ class LibrarySync(Thread): # Set the timer koditime = utils.getUnixTimestamp() # Toggle watched state - PlexFunctions.scrobble(plexId, 'watched') + PF.scrobble(plexId, 'watched') # Let the PMS process this first! xbmc.sleep(2000) # Get all PMS items to find the item we changed - items = PlexFunctions.GetAllPlexLeaves(libraryId, - lastViewedAt=timestamp, - containerSize=self.limitindex) + items = PF.GetAllPlexLeaves(libraryId, + lastViewedAt=timestamp, + containerSize=self.limitindex) # Toggle watched state back - PlexFunctions.scrobble(plexId, 'unwatched') + PF.scrobble(plexId, 'unwatched') # Get server timestamp for this change plextime = None for item in items: @@ -438,9 +448,9 @@ class LibrarySync(Thread): # We need to process this: self.updatelist.append({ 'itemId': itemId, - 'itemType': PlexFunctions.GetItemClassFromType( + 'itemType': PF.GetItemClassFromType( plexType), - 'method': PlexFunctions.GetMethodFromPlexType(plexType), + 'method': PF.GetMethodFromPlexType(plexType), 'viewName': viewName, 'viewId': viewId, 'title': title @@ -488,7 +498,7 @@ class LibrarySync(Thread): for view in self.views: self.updatelist = [] # Get items per view - items = PlexFunctions.GetAllPlexLeaves( + items = PF.GetAllPlexLeaves( view['id'], updatedAt=self.getPMSfromKodiTime(lastSync), containerSize=self.limitindex) @@ -512,7 +522,7 @@ class LibrarySync(Thread): self.updateKodiMusicLib = True # Do the work self.GetAndProcessXMLs( - PlexFunctions.GetItemClassFromType(plexType), + PF.GetItemClassFromType(plexType), showProgress=False) self.updatelist = [] @@ -524,7 +534,7 @@ class LibrarySync(Thread): episodeupdate = False songupdate = False for view in self.views: - items = PlexFunctions.GetAllPlexLeaves( + items = PF.GetAllPlexLeaves( view['id'], lastViewedAt=self.getPMSfromKodiTime(lastSync), containerSize=self.limitindex) @@ -1079,7 +1089,7 @@ class LibrarySync(Thread): # Get items per view viewId = view['id'] viewName = view['name'] - all_plexmovies = PlexFunctions.GetPlexSectionResults( + all_plexmovies = PF.GetPlexSectionResults( viewId, args=None, containerSize=self.limitindex) if all_plexmovies is None: self.logMsg("Couldnt get section items, aborting for view.", 1) @@ -1115,10 +1125,10 @@ class LibrarySync(Thread): also updates resume times. This is done by downloading one XML for ALL elements with viewId """ - xml = PlexFunctions.GetAllPlexLeaves(viewId, - lastViewedAt=lastViewedAt, - updatedAt=updatedAt, - containerSize=self.limitindex) + xml = PF.GetAllPlexLeaves(viewId, + lastViewedAt=lastViewedAt, + updatedAt=updatedAt, + containerSize=self.limitindex) # Return if there are no items in PMS reply - it's faster try: xml[0].attrib @@ -1213,7 +1223,7 @@ class LibrarySync(Thread): # Get items per view viewId = view['id'] viewName = view['name'] - allPlexTvShows = PlexFunctions.GetPlexSectionResults( + allPlexTvShows = PF.GetPlexSectionResults( viewId, containerSize=self.limitindex) if allPlexTvShows is None: self.logMsg( @@ -1240,7 +1250,7 @@ class LibrarySync(Thread): if self.threadStopped(): return False # Grab all seasons to tvshow from PMS - seasons = PlexFunctions.GetAllPlexChildren( + seasons = PF.GetAllPlexChildren( tvShowId, containerSize=self.limitindex) if seasons is None: self.logMsg( @@ -1265,7 +1275,7 @@ class LibrarySync(Thread): if self.threadStopped(): return False # Grab all episodes to tvshow from PMS - episodes = PlexFunctions.GetAllPlexLeaves( + episodes = PF.GetAllPlexLeaves( view['id'], containerSize=self.limitindex) if episodes is None: self.logMsg( @@ -1288,7 +1298,7 @@ class LibrarySync(Thread): # Cycle through tv shows with itemtypes.TVShows() as TVshow: for tvShowId in allPlexTvShowsId: - XMLtvshow = PlexFunctions.GetPlexMetadata(tvShowId) + XMLtvshow = PF.GetPlexMetadata(tvShowId) TVshow.refreshSeasonEntry(XMLtvshow, tvShowId) self.logMsg("Season info refreshed", 1) @@ -1363,7 +1373,7 @@ class LibrarySync(Thread): # Get items per view viewId = view['id'] viewName = view['name'] - itemsXML = PlexFunctions.GetPlexSectionResults( + itemsXML = PF.GetPlexSectionResults( viewId, args=urlArgs, containerSize=self.limitindex) if itemsXML is None: self.logMsg("Error downloading xml for view %s" @@ -1401,6 +1411,159 @@ class LibrarySync(Thread): # Database out of date. return False + def processMessage(self, message): + """ + processes json.loads() messages from websocket. Triage what we need to + do with "process_" methods + """ + typus = message.get('type') + if typus is None: + self.logMsg('No type, dropping message: %s' % message, -1) + return + + if typus == 'playing': + self.process_playing(message['_children']) + elif typus == 'timeline': + self.process_timeline(message['_children']) + else: + self.logMsg('Dropping message: %s' % message, -1) + + def multi_delete(self, liste, deleteListe): + """ + Deletes the list items of liste at the positions in deleteListe + """ + indexes = sorted(deleteListe, reverse=True) + for index in indexes: + del liste[index] + return liste + + def process_newitems(self): + """ + Periodically called to process new/updated PMS items + + PMS needs a while to download info from internet AFTER it + showed up under 'timeline' websocket messages + """ + videoLibUpdate = False + now = utils.getUnixTimestamp() + deleteListe = [] + for i, item in enumerate(self.itemsToProcess): + ratingKey = item['ratingKey'] + timestamp = item['timestamp'] + if now - timestamp < self.safteyMargin: + # We haven't waited long enough for the PMS to finish + # processing the item + continue + xml = PF.GetPlexMetadata(ratingKey) + if xml is None: + self.logMsg('Could not download metadata for %s' + % ratingKey, -1) + continue + deleteListe.append(i) + self.logMsg("Adding new PMS item: %s" % ratingKey, 1) + viewtag = xml.attrib.get('librarySectionTitle') + viewid = xml.attrib.get('librarySectionID') + mediatype = xml[0].attrib.get('type') + if mediatype == 'movie': + # Movie + videoLibUpdate = True + with itemtypes.Movies() as movie: + movie.add_update(xml[0], + viewtag=viewtag, + viewid=viewid) + elif mediatype == 'episode': + # Episode + videoLibUpdate = True + with itemtypes.TVShows() as show: + show.add_updateEpisode(xml[0], + viewtag=viewtag, + viewid=viewid) + + # Get rid of the items we just processed + if len(deleteListe) > 0: + self.itemsToProcess = self.multi_delete( + self.itemsToProcess, deleteListe) + # Let Kodi know of the change + if videoLibUpdate is True: + self.logMsg("Doing Kodi Video Lib update", 1) + xbmc.executebuiltin('UpdateLibrary(video)') + + def process_timeline(self, data): + """ + PMS is messing with the library items + + data['type']: + 1: movie + 2: tv show?? + 3: season?? + 4: episode + 12: trailer, extras? + """ + videoLibUpdate = False + for item in data: + if item.get('state') == 9: + # Item was deleted. + # Only care for playable type + # For some reason itemID and not ratingKey + if item.get('type') == 1: + # Movie + self.logMsg("Removing movie %s" % item.get('itemID'), 1) + videoLibUpdate = True + with itemtypes.Movies() as movie: + movie.remove(item.get('itemID')) + elif item.get('type') == 4: + # Episode + self.logMsg("Removing episode %s" % item.get('itemID'), 1) + videoLibUpdate = True + with itemtypes.TVShows() as show: + show.remove(item.get('itemID')) + + elif item.get('state') == 5 and item.get('type') in (1, 4): + # Item added or changed + # Need to process later because PMS needs to be done first + self.logMsg('New/changed PMS item: %s' % item.get('itemID'), 1) + self.itemsToProcess.append({ + 'ratingKey': item.get('itemID'), + 'timestamp': utils.getUnixTimestamp() + }) + + # Let Kodi know of the change + if videoLibUpdate is True: + self.logMsg("Doing Kodi Video Lib update", 1) + xbmc.executebuiltin('UpdateLibrary(video)') + + def process_playing(self, data): + items = [] + with embydb.GetEmbyDB() as emby_db: + for item in data: + # Drop buffering messages + state = item.get('state') + if state == 'buffering': + continue + ratingKey = item.get('ratingKey') + kodiInfo = emby_db.getItem_byId(ratingKey) + if kodiInfo is None: + # Item not (yet) in Kodi library + continue + items.append({ + 'ratingKey': ratingKey, + 'kodi_id': kodiInfo[0], + 'file_id': kodiInfo[1], + 'kodi_type': kodiInfo[4], + 'viewOffset': PF.ConvertPlexToKodiTime( + item.get('viewOffset')), + 'state': state, + 'duration': PF.ConvertPlexToKodiTime( + item.get('duration')), + 'viewCount': item.get('viewCount'), + 'lastViewedAt': utils.DateToKodi(utils.getUnixTimestamp()) + }) + for item in items: + itemFkt = getattr(itemtypes, + PF.GetItemClassFromType(item['kodi_type'])) + with itemFkt() as Fkt: + Fkt.updatePlaystate(item) + def run(self): try: self.run_internal() @@ -1424,11 +1587,13 @@ class LibrarySync(Thread): installSyncDone = self.installSyncDone enableBackgroundSync = self.enableBackgroundSync fullSync = self.fullSync - fastSync = self.fastSync + processMessage = self.processMessage string = self.__language__ dialog = xbmcgui.Dialog() + queue = self.queue + startupComplete = False self.views = [] count = 0 @@ -1563,16 +1728,27 @@ class LibrarySync(Thread): window('emby_dbScan', clear=True) # Full library sync finished self.showKodiNote(string(39407), forced=False) - # Run fast sync otherwise (ever second or so) + elif count % 300 == 0: + count += 1 + self.process_newitems() else: - window('emby_dbScan', value="true") - if not fastSync(): - # Fast sync failed or server plugin is not found - log("Something went wrong, starting full sync", -1) - fullSync(manualrun=True) - window('emby_dbScan', clear=True) + count += 1 + # See if there is a PMS message we need to handle + try: + message = queue.get(block=False) + # Empty queue + except Queue.Empty: + xbmc.sleep(100) + continue + # Got a message from PMS; process it + else: + window('emby_dbScan', value="true") + processMessage(message) + window('emby_dbScan', clear=True) + # NO sleep! + continue - xbmc.sleep(1000) + xbmc.sleep(100) count += 1 log("###===--- LibrarySync Stopped ---===###", 0) diff --git a/resources/lib/plexbmchelper/settings.py b/resources/lib/plexbmchelper/settings.py index 1aef4944..3db6de5f 100644 --- a/resources/lib/plexbmchelper/settings.py +++ b/resources/lib/plexbmchelper/settings.py @@ -6,9 +6,11 @@ import utils settings = {} try: - guidoc = parse(xbmc.translatePath('special://userdata/guisettings.xml')) + path = xbmc.translatePath( + 'special://userdata/guisettings.xml').decode('utf-8') + guidoc = parse(path) except: - print "Unable to read XBMC's guisettings.xml" + print "PlexKodiConnect - Unable to read XBMC's guisettings.xml" def getGUI(name): global guidoc @@ -36,6 +38,7 @@ for entry in kodiSettingsList: settings['client_name'] = plexbmc.getSetting('deviceName') # XBMC web server settings +xbmc.sleep(5000) settings['webserver_enabled'] = (getGUI('webserver') == "true") settings['port'] = int(getGUI('webserverport')) settings['user'] = getGUI('webserverusername') diff --git a/resources/lib/websocket.py b/resources/lib/websocket.py index 3d777a97..eafd471a 100644 --- a/resources/lib/websocket.py +++ b/resources/lib/websocket.py @@ -46,6 +46,8 @@ import logging import traceback import sys +import xbmc + """ websocket python client. ========================= @@ -728,9 +730,12 @@ class WebSocket(object): except socket.timeout as e: raise WebSocketTimeoutException(e.args[0]) except Exception as e: - if "timed out" in e.args[0]: - raise WebSocketTimeoutException(e.args[0]) - else: + try: + if "timed out" in e.args[0]: + raise WebSocketTimeoutException(e.args[0]) + else: + raise e + except: raise e def _recv(self, bufsize): @@ -879,6 +884,7 @@ class WebSocketApp(object): #print str(e.args[0]) if "timed out" not in e.args[0]: raise e + xbmc.sleep(100) except Exception, e: self._callback(self.on_error, e) diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index d7d28068..5004a372 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -4,6 +4,7 @@ import json import threading +import Queue import websocket import ssl @@ -26,20 +27,27 @@ logging.basicConfig() @utils.logging @utils.ThreadMethods class WebSocket_Client(threading.Thread): + """ + websocket_client.WebSocket_Client(queue) + where (communication with librarysync) + queue: Queue object for background sync + """ _shared_state = {} client = None stopWebsocket = False - def __init__(self): + def __init__(self, queue): self.__dict__ = self._shared_state + # Communication with librarysync + self.queue = queue + self.doUtils = downloadutils.DownloadUtils() self.clientInfo = clientinfo.ClientInfo() self.deviceId = self.clientInfo.getDeviceId() - self.librarySync = librarysync.LibrarySync() # 'state' that can be returned by PMS self.timeStates = { @@ -77,23 +85,18 @@ class WebSocket_Client(threading.Thread): """ try: message = json.loads(message) - except Exception as ex: - self.logMsg('Error decoding message from websocket: %s' % ex, -1) - self.logMsg(message, -1) + except Exception as e: + self.logMsg('Error decoding message from websocket: %s' % e, -1) return False - typus = message.get('type') - if not typus: - return False - - process_func = getattr(self, 'processing_%s' % typus, None) - if process_func and process_func(message): + # Put PMS message on queue and let libsync take care of it + try: + self.queue.put(message) return True - - # Something went wrong; log - self.logMsg("Error processing PMS websocket message.", -1) - self.logMsg("Received websocket message from PMS: %s" % message, -1) - return True + except Queue.Full: + # Queue only takes 100 messages. No worries if we miss one or two + self.logMsg('Queue is full, dropping PMS message', 0) + return False def processing_playing(self, message): """ diff --git a/service.py b/service.py index d1fdc473..4f4654cc 100644 --- a/service.py +++ b/service.py @@ -5,6 +5,7 @@ import os import sys from datetime import datetime +import Queue import xbmc import xbmcaddon @@ -109,10 +110,13 @@ class Service(): # Server auto-detect initialsetup.InitialSetup().setup() + # Queue and lock for background sync + queue = Queue.LifoQueue(maxsize=100) + # Initialize important threads user = userclient.UserClient() - ws = wsc.WebSocket_Client() - library = librarysync.LibrarySync() + ws = wsc.WebSocket_Client(queue) + library = librarysync.LibrarySync(queue) kplayer = player.Player() xplayer = xbmc.Player() plx = PlexAPI.PlexAPI() @@ -307,14 +311,19 @@ class Service(): except: xbmc.log('User client already shut down') + try: + downloadutils.DownloadUtils().stopSession() + except: + pass + log("======== STOP %s ========" % self.addonName, 0) # Delay option delay = int(utils.settings('startupDelay')) xbmc.log("Delaying Plex startup by: %s sec..." % delay) -# Plex: add 5 seconds just for good measure -if delay and xbmc.Monitor().waitForAbort(delay+5): +# Plex: add 10 seconds just for good measure +if delay and xbmc.Monitor().waitForAbort(delay+10): # Start the service xbmc.log("Abort requested while waiting. Emby for kodi not started.") else: