Plex websocket - groundworks

This commit is contained in:
tomkat83 2016-03-21 17:15:22 +01:00
parent 8b0ce0059c
commit bde5ed627b
2 changed files with 121 additions and 55 deletions

View file

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
################################################################################################# ###############################################################################
import json import json
import threading import threading
import websocket import websocket
import ssl
import xbmc import xbmc
import xbmcgui import xbmcgui
@ -19,9 +20,11 @@ import utils
import logging import logging
logging.basicConfig() logging.basicConfig()
################################################################################################# ###############################################################################
@utils.logging
@utils.ThreadMethods
class WebSocket_Client(threading.Thread): class WebSocket_Client(threading.Thread):
_shared_state = {} _shared_state = {}
@ -29,28 +32,29 @@ class WebSocket_Client(threading.Thread):
client = None client = None
stopWebsocket = False stopWebsocket = False
def __init__(self): def __init__(self):
self.__dict__ = self._shared_state self.__dict__ = self._shared_state
self.monitor = xbmc.Monitor()
self.doUtils = downloadutils.DownloadUtils() self.doUtils = downloadutils.DownloadUtils()
self.clientInfo = clientinfo.ClientInfo() self.clientInfo = clientinfo.ClientInfo()
self.addonName = self.clientInfo.getAddonName()
self.deviceId = self.clientInfo.getDeviceId() self.deviceId = self.clientInfo.getDeviceId()
self.librarySync = librarysync.LibrarySync() self.librarySync = librarysync.LibrarySync()
# 'state' that can be returned by PMS
self.timeStates = {
0: 'created',
2: 'matching',
3: 'downloading',
4: 'loading',
5: 'finished',
6: 'analyzing',
9: 'deleted'
}
threading.Thread.__init__(self) threading.Thread.__init__(self)
def logMsg(self, msg, lvl=1):
self.className = self.__class__.__name__
utils.logMsg("%s %s" % (self.addonName, self.className), msg, lvl)
def sendProgressUpdate(self, data): def sendProgressUpdate(self, data):
log = self.logMsg log = self.logMsg
log("sendProgressUpdate", 2) log("sendProgressUpdate", 2)
@ -68,18 +72,73 @@ class WebSocket_Client(threading.Thread):
log("Exception: %s" % e, 1) log("Exception: %s" % e, 1)
def on_message(self, ws, message): def on_message(self, ws, message):
"""
Will be called automatically if ws receives a message from PMS
"""
try:
message = json.loads(message)
except Exception as ex:
self.logMsg('Error decoding message from websocket: %s' % ex, -1)
self.logMsg(message, -1)
return False
log = self.logMsg typus = message.get('type')
window = utils.window if not typus:
lang = utils.language return False
result = json.loads(message) process_func = getattr(self, 'processing_%s' % typus, None)
messageType = result['MessageType'] if process_func and process_func(message):
data = result['Data'] return True
# Something went wrong; log
self.logMsg("Error processing PMS websocket message.", -1)
self.logMsg("Received websocket message from PMS: %s" % message, -1)
return True
def processing_playing(self, message):
"""
Called when somewhere a PMS item is started, being played, stopped.
Calls Libsync with a list of children dictionaries:
{
'_elementType': e.g. 'PlaySessionStateNotification'
'guid': e.g. ''
'key': e.g. '/library/metadata/282300',
'ratingKey': e.g. '282300',
'sessionKey': e.g. '590',
'state': e.g. 'playing', 'available', 'buffering',
'stopped'
'transcodeSession': e.g. 'yv50n9p4cr',
'url': e.g. ''
'viewOffset': e.g. 1878534 (INT!)
}
"""
children = message.get('_children')
if not children:
return False
def processing_progress(self, message):
"""
Called when a PMS items keeps getting played (resume points update)
"""
def processing_timeline(self, message):
"""
Called when a PMS is in the process or has updated/added/removed a
library item
"""
children = message.get('_children')
if not children:
return False
for item in children:
state = self.timeStates.get(item.get('state'))
return True
def processing_status(self, message):
"""
Called when a PMS is scanning its libraries (to be verified)
"""
if messageType not in ('SessionEnded'):
# Mute certain events
log("Message: %s" % message, 1)
if messageType == "Play": if messageType == "Play":
# A remote control play command has been sent from the server. # A remote control play command has been sent from the server.
@ -255,7 +314,7 @@ class WebSocket_Client(threading.Thread):
elif messageType == "ServerRestarting": elif messageType == "ServerRestarting":
if utils.settings('supressRestartMsg') == "true": if utils.settings('supressRestartMsg') == "true":
xbmcgui.Dialog().notification( xbmcgui.Dialog().notification(
heading="Emby for Kodi", heading=self.addonName,
message=lang(33006), message=lang(33006),
icon="special://home/addons/plugin.video.emby/icon.png") icon="special://home/addons/plugin.video.emby/icon.png")
@ -268,6 +327,7 @@ class WebSocket_Client(threading.Thread):
self.logMsg("Closed.", 2) self.logMsg("Closed.", 2)
def on_open(self, ws): def on_open(self, ws):
return
self.doUtils.postCapabilities(self.deviceId) self.doUtils.postCapabilities(self.deviceId)
def on_error(self, ws, error): def on_error(self, ws, error):
@ -281,14 +341,12 @@ class WebSocket_Client(threading.Thread):
log = self.logMsg log = self.logMsg
window = utils.window window = utils.window
monitor = self.monitor
loglevel = int(window('emby_logLevel'))
# websocket.enableTrace(True) # websocket.enableTrace(True)
userId = window('emby_currUser') userId = window('currUserId')
server = window('emby_server%s' % userId) server = window('pms_server')
token = window('emby_accessToken%s' % userId) token = window('pms_token')
deviceId = self.deviceId deviceId = self.deviceId
# Get the appropriate prefix for the websocket # Get the appropriate prefix for the websocket
@ -297,31 +355,35 @@ class WebSocket_Client(threading.Thread):
else: else:
server = server.replace('http', "ws") server = server.replace('http', "ws")
websocket_url = "%s?api_key=%s&deviceId=%s" % (server, token, deviceId) websocket_url = "%s/:/websockets/notifications" % server
if token:
websocket_url += '?X-Plex-Token=%s' % token
log("websocket url: %s" % websocket_url, 1) log("websocket url: %s" % websocket_url, 1)
self.client = websocket.WebSocketApp(websocket_url, sslopt = {}
on_message=self.on_message, if utils.settings('sslverify') == "false":
on_error=self.on_error, sslopt["cert_reqs"] = ssl.CERT_NONE
on_close=self.on_close)
self.client = websocket.WebSocketApp(
websocket_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.client.on_open = self.on_open self.client.on_open = self.on_open
log("----===## Starting WebSocketClient ##===----", 0) log("----===## Starting WebSocketClient ##===----", 0)
while not monitor.abortRequested(): while not self.threadStopped():
self.client.run_forever(ping_interval=10,
self.client.run_forever(ping_interval=10) sslopt=sslopt)
if self.stopWebsocket: xbmc.sleep(100)
break
if monitor.waitForAbort(5):
# Abort was requested, exit
break
log("##===---- WebSocketClient Stopped ----===##", 0) log("##===---- WebSocketClient Stopped ----===##", 0)
def stopClient(self): def stopThread(self):
"""
self.stopWebsocket = True Overwrite this method from ThreadMethods to close websockets first
"""
self.logMsg("Stopping websocket client thread.", 1)
self.client.close() self.client.close()
self.logMsg("Stopping thread.", 1) self._threadStopped = True

View file

@ -27,6 +27,7 @@ import librarysync
import player import player
import utils import utils
import videonodes import videonodes
import websocket_client as wsc
import PlexAPI import PlexAPI
import PlexCompanion import PlexCompanion
@ -106,7 +107,7 @@ class Service():
# Initialize important threads # Initialize important threads
user = userclient.UserClient() user = userclient.UserClient()
# ws = wsc.WebSocket_Client() ws = wsc.WebSocket_Client()
library = librarysync.LibrarySync() library = librarysync.LibrarySync()
kplayer = player.Player() kplayer = player.Player()
xplayer = xbmc.Player() xplayer = xbmc.Player()
@ -184,9 +185,9 @@ class Service():
self.kodimonitor_running = kodimonitor.KodiMonitor() self.kodimonitor_running = kodimonitor.KodiMonitor()
# Start the Websocket Client # Start the Websocket Client
# if not self.websocket_running: if not self.websocket_running:
# self.websocket_running = True self.websocket_running = True
# ws.start() ws.start()
# Start the syncing thread # Start the syncing thread
if not self.library_running: if not self.library_running:
self.library_running = True self.library_running = True
@ -296,8 +297,11 @@ class Service():
except: except:
xbmc.log('Library sync already shut down') xbmc.log('Library sync already shut down')
# if self.websocket_running: try:
# ws.stopClient() ws.stopThread()
except:
xbmc.log('Websocket client already shut down')
try: try:
if self.userclient_running: if self.userclient_running:
user.stopThread() user.stopThread()