Plex websockets - groundworks 2

This commit is contained in:
tomkat83 2016-03-24 18:52:02 +01:00
parent ed5483e2ae
commit 5949988b68
8 changed files with 312 additions and 69 deletions

View file

@ -31,6 +31,8 @@ def ConvertPlexToKodiTime(plexTime):
"""
Converts Plextime to Koditime. Returns an int (in seconds).
"""
if plexTime is None:
return None
return int(float(plexTime) * PlexToKodiTimefactor())
@ -47,6 +49,14 @@ def GetItemClassFromType(itemType):
return classes[itemType]
def GetItemClassFromNumber(itemType):
classes = {
1: 'Movies',
4: 'TVShows',
}
return classes[itemType]
def GetKodiTypeFromPlex(plexItemType):
"""
As used in playlist.item here: http://kodi.wiki/view/JSON-RPC_API

View file

@ -257,6 +257,17 @@ class Items(object):
userdata['PlayCount'],
userdata['LastPlayedDate'])
def updatePlaystate(self, item):
if item['duration'] is None:
item['duration'] = self.kodi_db.getVideoRuntime(item['kodi_id'],
item['kodi_type'])
self.logMsg('Updating item with: %s' % item, 0)
self.kodi_db.addPlaystate(item['file_id'],
item['viewOffset'],
item['duration'],
item['viewCount'],
item['lastViewedAt'])
class Movies(Items):

View file

@ -857,10 +857,28 @@ class Kodidb_Functions():
ids.append(row[0])
return ids
def getVideoRuntime(self, kodiid, mediatype):
if mediatype == 'movie':
query = ' '.join((
"SELECT c11",
"FROM movie",
"WHERE idMovie = ?",
))
elif mediatype == 'episode':
query = ' '.join((
"SELECT c09",
"FROM episode",
"WHERE idEpisode = ?",
))
self.cursor.execute(query, (kodiid,))
try:
runtime = self.cursor.fetchone()[0]
except TypeError:
return None
return int(runtime)
def addPlaystate(self, fileid, resume_seconds, total_seconds, playcount, dateplayed):
cursor = self.cursor
# Delete existing resume point
query = ' '.join((
@ -870,13 +888,20 @@ class Kodidb_Functions():
cursor.execute(query, (fileid,))
# Set watched count
query = ' '.join((
"UPDATE files",
"SET playCount = ?, lastPlayed = ?",
"WHERE idFile = ?"
))
cursor.execute(query, (playcount, dateplayed, fileid))
if playcount is None:
query = ' '.join((
"UPDATE files",
"SET lastPlayed = ?",
"WHERE idFile = ?"
))
cursor.execute(query, (dateplayed, fileid))
else:
query = ' '.join((
"UPDATE files",
"SET playCount = ?, lastPlayed = ?",
"WHERE idFile = ?"
))
cursor.execute(query, (playcount, dateplayed, fileid))
# Set the resume bookmark
if resume_seconds:

View file

@ -21,7 +21,7 @@ import read_embyserver as embyserver
import userclient
import videonodes
import PlexFunctions
import PlexFunctions as PF
###############################################################################
@ -66,7 +66,7 @@ class ThreadedGetMetadata(Thread):
xbmc.sleep(100)
continue
# Download Metadata
plexXML = PlexFunctions.GetPlexMetadata(updateItem['itemId'])
plexXML = PF.GetPlexMetadata(updateItem['itemId'])
if plexXML is None:
# Did not receive a valid XML - skip that item for now
self.logMsg("Could not get metadata for %s. "
@ -209,17 +209,27 @@ class ThreadedShowSyncInfo(Thread):
@utils.ThreadMethodsAdditionalStop('emby_shouldStop')
@utils.ThreadMethods
class LibrarySync(Thread):
"""
librarysync.LibrarySync(queue)
where (communication with websockets)
queue: Queue object for background sync
"""
# Borg, even though it's planned to only have 1 instance up and running!
_shared_state = {}
# How long should we look into the past for fast syncing items (in s)
syncPast = 30
def __init__(self):
def __init__(self, queue):
self.__dict__ = self._shared_state
self.__language__ = xbmcaddon.Addon().getLocalizedString
# Communication with websockets
self.queue = queue
self.itemsToProcess = []
self.safteyMargin = 30
self.clientInfo = clientinfo.ClientInfo()
self.user = userclient.UserClient()
self.emby = embyserver.Read_EmbyServer()
@ -325,7 +335,7 @@ class LibrarySync(Thread):
return
# Get the Plex item's metadata
xml = PlexFunctions.GetPlexMetadata(plexId)
xml = PF.GetPlexMetadata(plexId)
if xml is None:
self.logMsg("Could not download metadata, aborting time sync", -1)
return
@ -341,15 +351,15 @@ class LibrarySync(Thread):
# Set the timer
koditime = utils.getUnixTimestamp()
# Toggle watched state
PlexFunctions.scrobble(plexId, 'watched')
PF.scrobble(plexId, 'watched')
# Let the PMS process this first!
xbmc.sleep(2000)
# Get all PMS items to find the item we changed
items = PlexFunctions.GetAllPlexLeaves(libraryId,
lastViewedAt=timestamp,
containerSize=self.limitindex)
items = PF.GetAllPlexLeaves(libraryId,
lastViewedAt=timestamp,
containerSize=self.limitindex)
# Toggle watched state back
PlexFunctions.scrobble(plexId, 'unwatched')
PF.scrobble(plexId, 'unwatched')
# Get server timestamp for this change
plextime = None
for item in items:
@ -438,9 +448,9 @@ class LibrarySync(Thread):
# We need to process this:
self.updatelist.append({
'itemId': itemId,
'itemType': PlexFunctions.GetItemClassFromType(
'itemType': PF.GetItemClassFromType(
plexType),
'method': PlexFunctions.GetMethodFromPlexType(plexType),
'method': PF.GetMethodFromPlexType(plexType),
'viewName': viewName,
'viewId': viewId,
'title': title
@ -488,7 +498,7 @@ class LibrarySync(Thread):
for view in self.views:
self.updatelist = []
# Get items per view
items = PlexFunctions.GetAllPlexLeaves(
items = PF.GetAllPlexLeaves(
view['id'],
updatedAt=self.getPMSfromKodiTime(lastSync),
containerSize=self.limitindex)
@ -512,7 +522,7 @@ class LibrarySync(Thread):
self.updateKodiMusicLib = True
# Do the work
self.GetAndProcessXMLs(
PlexFunctions.GetItemClassFromType(plexType),
PF.GetItemClassFromType(plexType),
showProgress=False)
self.updatelist = []
@ -524,7 +534,7 @@ class LibrarySync(Thread):
episodeupdate = False
songupdate = False
for view in self.views:
items = PlexFunctions.GetAllPlexLeaves(
items = PF.GetAllPlexLeaves(
view['id'],
lastViewedAt=self.getPMSfromKodiTime(lastSync),
containerSize=self.limitindex)
@ -1079,7 +1089,7 @@ class LibrarySync(Thread):
# Get items per view
viewId = view['id']
viewName = view['name']
all_plexmovies = PlexFunctions.GetPlexSectionResults(
all_plexmovies = PF.GetPlexSectionResults(
viewId, args=None, containerSize=self.limitindex)
if all_plexmovies is None:
self.logMsg("Couldnt get section items, aborting for view.", 1)
@ -1115,10 +1125,10 @@ class LibrarySync(Thread):
also updates resume times.
This is done by downloading one XML for ALL elements with viewId
"""
xml = PlexFunctions.GetAllPlexLeaves(viewId,
lastViewedAt=lastViewedAt,
updatedAt=updatedAt,
containerSize=self.limitindex)
xml = PF.GetAllPlexLeaves(viewId,
lastViewedAt=lastViewedAt,
updatedAt=updatedAt,
containerSize=self.limitindex)
# Return if there are no items in PMS reply - it's faster
try:
xml[0].attrib
@ -1213,7 +1223,7 @@ class LibrarySync(Thread):
# Get items per view
viewId = view['id']
viewName = view['name']
allPlexTvShows = PlexFunctions.GetPlexSectionResults(
allPlexTvShows = PF.GetPlexSectionResults(
viewId, containerSize=self.limitindex)
if allPlexTvShows is None:
self.logMsg(
@ -1240,7 +1250,7 @@ class LibrarySync(Thread):
if self.threadStopped():
return False
# Grab all seasons to tvshow from PMS
seasons = PlexFunctions.GetAllPlexChildren(
seasons = PF.GetAllPlexChildren(
tvShowId, containerSize=self.limitindex)
if seasons is None:
self.logMsg(
@ -1265,7 +1275,7 @@ class LibrarySync(Thread):
if self.threadStopped():
return False
# Grab all episodes to tvshow from PMS
episodes = PlexFunctions.GetAllPlexLeaves(
episodes = PF.GetAllPlexLeaves(
view['id'], containerSize=self.limitindex)
if episodes is None:
self.logMsg(
@ -1288,7 +1298,7 @@ class LibrarySync(Thread):
# Cycle through tv shows
with itemtypes.TVShows() as TVshow:
for tvShowId in allPlexTvShowsId:
XMLtvshow = PlexFunctions.GetPlexMetadata(tvShowId)
XMLtvshow = PF.GetPlexMetadata(tvShowId)
TVshow.refreshSeasonEntry(XMLtvshow, tvShowId)
self.logMsg("Season info refreshed", 1)
@ -1363,7 +1373,7 @@ class LibrarySync(Thread):
# Get items per view
viewId = view['id']
viewName = view['name']
itemsXML = PlexFunctions.GetPlexSectionResults(
itemsXML = PF.GetPlexSectionResults(
viewId, args=urlArgs, containerSize=self.limitindex)
if itemsXML is None:
self.logMsg("Error downloading xml for view %s"
@ -1401,6 +1411,159 @@ class LibrarySync(Thread):
# Database out of date.
return False
def processMessage(self, message):
"""
processes json.loads() messages from websocket. Triage what we need to
do with "process_" methods
"""
typus = message.get('type')
if typus is None:
self.logMsg('No type, dropping message: %s' % message, -1)
return
if typus == 'playing':
self.process_playing(message['_children'])
elif typus == 'timeline':
self.process_timeline(message['_children'])
else:
self.logMsg('Dropping message: %s' % message, -1)
def multi_delete(self, liste, deleteListe):
"""
Deletes the list items of liste at the positions in deleteListe
"""
indexes = sorted(deleteListe, reverse=True)
for index in indexes:
del liste[index]
return liste
def process_newitems(self):
"""
Periodically called to process new/updated PMS items
PMS needs a while to download info from internet AFTER it
showed up under 'timeline' websocket messages
"""
videoLibUpdate = False
now = utils.getUnixTimestamp()
deleteListe = []
for i, item in enumerate(self.itemsToProcess):
ratingKey = item['ratingKey']
timestamp = item['timestamp']
if now - timestamp < self.safteyMargin:
# We haven't waited long enough for the PMS to finish
# processing the item
continue
xml = PF.GetPlexMetadata(ratingKey)
if xml is None:
self.logMsg('Could not download metadata for %s'
% ratingKey, -1)
continue
deleteListe.append(i)
self.logMsg("Adding new PMS item: %s" % ratingKey, 1)
viewtag = xml.attrib.get('librarySectionTitle')
viewid = xml.attrib.get('librarySectionID')
mediatype = xml[0].attrib.get('type')
if mediatype == 'movie':
# Movie
videoLibUpdate = True
with itemtypes.Movies() as movie:
movie.add_update(xml[0],
viewtag=viewtag,
viewid=viewid)
elif mediatype == 'episode':
# Episode
videoLibUpdate = True
with itemtypes.TVShows() as show:
show.add_updateEpisode(xml[0],
viewtag=viewtag,
viewid=viewid)
# Get rid of the items we just processed
if len(deleteListe) > 0:
self.itemsToProcess = self.multi_delete(
self.itemsToProcess, deleteListe)
# Let Kodi know of the change
if videoLibUpdate is True:
self.logMsg("Doing Kodi Video Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(video)')
def process_timeline(self, data):
"""
PMS is messing with the library items
data['type']:
1: movie
2: tv show??
3: season??
4: episode
12: trailer, extras?
"""
videoLibUpdate = False
for item in data:
if item.get('state') == 9:
# Item was deleted.
# Only care for playable type
# For some reason itemID and not ratingKey
if item.get('type') == 1:
# Movie
self.logMsg("Removing movie %s" % item.get('itemID'), 1)
videoLibUpdate = True
with itemtypes.Movies() as movie:
movie.remove(item.get('itemID'))
elif item.get('type') == 4:
# Episode
self.logMsg("Removing episode %s" % item.get('itemID'), 1)
videoLibUpdate = True
with itemtypes.TVShows() as show:
show.remove(item.get('itemID'))
elif item.get('state') == 5 and item.get('type') in (1, 4):
# Item added or changed
# Need to process later because PMS needs to be done first
self.logMsg('New/changed PMS item: %s' % item.get('itemID'), 1)
self.itemsToProcess.append({
'ratingKey': item.get('itemID'),
'timestamp': utils.getUnixTimestamp()
})
# Let Kodi know of the change
if videoLibUpdate is True:
self.logMsg("Doing Kodi Video Lib update", 1)
xbmc.executebuiltin('UpdateLibrary(video)')
def process_playing(self, data):
items = []
with embydb.GetEmbyDB() as emby_db:
for item in data:
# Drop buffering messages
state = item.get('state')
if state == 'buffering':
continue
ratingKey = item.get('ratingKey')
kodiInfo = emby_db.getItem_byId(ratingKey)
if kodiInfo is None:
# Item not (yet) in Kodi library
continue
items.append({
'ratingKey': ratingKey,
'kodi_id': kodiInfo[0],
'file_id': kodiInfo[1],
'kodi_type': kodiInfo[4],
'viewOffset': PF.ConvertPlexToKodiTime(
item.get('viewOffset')),
'state': state,
'duration': PF.ConvertPlexToKodiTime(
item.get('duration')),
'viewCount': item.get('viewCount'),
'lastViewedAt': utils.DateToKodi(utils.getUnixTimestamp())
})
for item in items:
itemFkt = getattr(itemtypes,
PF.GetItemClassFromType(item['kodi_type']))
with itemFkt() as Fkt:
Fkt.updatePlaystate(item)
def run(self):
try:
self.run_internal()
@ -1424,11 +1587,13 @@ class LibrarySync(Thread):
installSyncDone = self.installSyncDone
enableBackgroundSync = self.enableBackgroundSync
fullSync = self.fullSync
fastSync = self.fastSync
processMessage = self.processMessage
string = self.__language__
dialog = xbmcgui.Dialog()
queue = self.queue
startupComplete = False
self.views = []
count = 0
@ -1563,16 +1728,27 @@ class LibrarySync(Thread):
window('emby_dbScan', clear=True)
# Full library sync finished
self.showKodiNote(string(39407), forced=False)
# Run fast sync otherwise (ever second or so)
elif count % 300 == 0:
count += 1
self.process_newitems()
else:
window('emby_dbScan', value="true")
if not fastSync():
# Fast sync failed or server plugin is not found
log("Something went wrong, starting full sync", -1)
fullSync(manualrun=True)
window('emby_dbScan', clear=True)
count += 1
# See if there is a PMS message we need to handle
try:
message = queue.get(block=False)
# Empty queue
except Queue.Empty:
xbmc.sleep(100)
continue
# Got a message from PMS; process it
else:
window('emby_dbScan', value="true")
processMessage(message)
window('emby_dbScan', clear=True)
# NO sleep!
continue
xbmc.sleep(1000)
xbmc.sleep(100)
count += 1
log("###===--- LibrarySync Stopped ---===###", 0)

View file

@ -6,9 +6,11 @@ import utils
settings = {}
try:
guidoc = parse(xbmc.translatePath('special://userdata/guisettings.xml'))
path = xbmc.translatePath(
'special://userdata/guisettings.xml').decode('utf-8')
guidoc = parse(path)
except:
print "Unable to read XBMC's guisettings.xml"
print "PlexKodiConnect - Unable to read XBMC's guisettings.xml"
def getGUI(name):
global guidoc
@ -36,6 +38,7 @@ for entry in kodiSettingsList:
settings['client_name'] = plexbmc.getSetting('deviceName')
# XBMC web server settings
xbmc.sleep(5000)
settings['webserver_enabled'] = (getGUI('webserver') == "true")
settings['port'] = int(getGUI('webserverport'))
settings['user'] = getGUI('webserverusername')

View file

@ -46,6 +46,8 @@ import logging
import traceback
import sys
import xbmc
"""
websocket python client.
=========================
@ -728,9 +730,12 @@ class WebSocket(object):
except socket.timeout as e:
raise WebSocketTimeoutException(e.args[0])
except Exception as e:
if "timed out" in e.args[0]:
raise WebSocketTimeoutException(e.args[0])
else:
try:
if "timed out" in e.args[0]:
raise WebSocketTimeoutException(e.args[0])
else:
raise e
except:
raise e
def _recv(self, bufsize):
@ -879,6 +884,7 @@ class WebSocketApp(object):
#print str(e.args[0])
if "timed out" not in e.args[0]:
raise e
xbmc.sleep(100)
except Exception, e:
self._callback(self.on_error, e)

View file

@ -4,6 +4,7 @@
import json
import threading
import Queue
import websocket
import ssl
@ -26,20 +27,27 @@ logging.basicConfig()
@utils.logging
@utils.ThreadMethods
class WebSocket_Client(threading.Thread):
"""
websocket_client.WebSocket_Client(queue)
where (communication with librarysync)
queue: Queue object for background sync
"""
_shared_state = {}
client = None
stopWebsocket = False
def __init__(self):
def __init__(self, queue):
self.__dict__ = self._shared_state
# Communication with librarysync
self.queue = queue
self.doUtils = downloadutils.DownloadUtils()
self.clientInfo = clientinfo.ClientInfo()
self.deviceId = self.clientInfo.getDeviceId()
self.librarySync = librarysync.LibrarySync()
# 'state' that can be returned by PMS
self.timeStates = {
@ -77,23 +85,18 @@ class WebSocket_Client(threading.Thread):
"""
try:
message = json.loads(message)
except Exception as ex:
self.logMsg('Error decoding message from websocket: %s' % ex, -1)
self.logMsg(message, -1)
except Exception as e:
self.logMsg('Error decoding message from websocket: %s' % e, -1)
return False
typus = message.get('type')
if not typus:
return False
process_func = getattr(self, 'processing_%s' % typus, None)
if process_func and process_func(message):
# Put PMS message on queue and let libsync take care of it
try:
self.queue.put(message)
return True
# Something went wrong; log
self.logMsg("Error processing PMS websocket message.", -1)
self.logMsg("Received websocket message from PMS: %s" % message, -1)
return True
except Queue.Full:
# Queue only takes 100 messages. No worries if we miss one or two
self.logMsg('Queue is full, dropping PMS message', 0)
return False
def processing_playing(self, message):
"""

View file

@ -5,6 +5,7 @@
import os
import sys
from datetime import datetime
import Queue
import xbmc
import xbmcaddon
@ -109,10 +110,13 @@ class Service():
# Server auto-detect
initialsetup.InitialSetup().setup()
# Queue and lock for background sync
queue = Queue.LifoQueue(maxsize=100)
# Initialize important threads
user = userclient.UserClient()
ws = wsc.WebSocket_Client()
library = librarysync.LibrarySync()
ws = wsc.WebSocket_Client(queue)
library = librarysync.LibrarySync(queue)
kplayer = player.Player()
xplayer = xbmc.Player()
plx = PlexAPI.PlexAPI()
@ -307,14 +311,19 @@ class Service():
except:
xbmc.log('User client already shut down')
try:
downloadutils.DownloadUtils().stopSession()
except:
pass
log("======== STOP %s ========" % self.addonName, 0)
# Delay option
delay = int(utils.settings('startupDelay'))
xbmc.log("Delaying Plex startup by: %s sec..." % delay)
# Plex: add 5 seconds just for good measure
if delay and xbmc.Monitor().waitForAbort(delay+5):
# Plex: add 10 seconds just for good measure
if delay and xbmc.Monitor().waitForAbort(delay+10):
# Start the service
xbmc.log("Abort requested while waiting. Emby for kodi not started.")
else: