From b734d0be8ef13f654d7642b30f254f741f6a1e58 Mon Sep 17 00:00:00 2001 From: tomkat83 Date: Sat, 4 Mar 2017 17:54:24 +0100 Subject: [PATCH] Amazon Alexa support! --- default.py | 3 +- resources/lib/PlexCompanion.py | 21 +- resources/lib/clientinfo.py | 2 +- resources/lib/companion.py | 193 +++++++++++++++ resources/lib/entrypoint.py | 23 +- resources/lib/plexbmchelper/listener.py | 139 ++--------- resources/lib/variables.py | 13 + resources/lib/websocket_client.py | 303 ++++++++++++++---------- service.py | 14 +- 9 files changed, 456 insertions(+), 255 deletions(-) create mode 100644 resources/lib/companion.py diff --git a/default.py b/default.py index 3c733577..bfc9b517 100644 --- a/default.py +++ b/default.py @@ -161,8 +161,7 @@ class Main(): modes[mode](itemid, params=argv[2]) elif mode == 'Plex_Node': modes[mode](params.get('id'), - params.get('viewOffset'), - params.get('plex_type')) + params.get('viewOffset')) else: modes[mode]() else: diff --git a/resources/lib/PlexCompanion.py b/resources/lib/PlexCompanion.py index 03780c49..7712f31c 100644 --- a/resources/lib/PlexCompanion.py +++ b/resources/lib/PlexCompanion.py @@ -6,14 +6,17 @@ from socket import SHUT_RDWR from xbmc import sleep -from utils import settings, ThreadMethodsAdditionalSuspend, ThreadMethods +from utils import settings, ThreadMethodsAdditionalSuspend, ThreadMethods, \ + window from plexbmchelper import listener, plexgdm, subscribers, functions, \ httppersist, plexsettings from PlexFunctions import ParseContainerKey, GetPlexMetadata from PlexAPI import API import player from entrypoint import Plex_Node -from variables import KODI_PLAYLIST_TYPE_FROM_PLEX_TYPE +from variables import KODI_PLAYLIST_TYPE_FROM_PLEX_TYPE, \ + PLEX_TO_KODI_TIMEFACTOR + ############################################################################### @@ -83,19 +86,28 @@ class PlexCompanion(Thread): thread = Thread(target=Plex_Node, args=('{server}%s' % data.get('key'), data.get('offset'), - data.get('type'), True),) thread.setDaemon(True) thread.start() elif task['action'] == 'playlist': # Get the playqueue ID try: - _, ID, query = ParseContainerKey(data['containerKey']) + typus, ID, query = ParseContainerKey(data['containerKey']) except Exception as e: log.error('Exception while processing: %s' % e) import traceback log.error("Traceback:\n%s" % traceback.format_exc()) return + if typus == 'library/metadata': + # e.g. Alexa + thread = Thread(target=Plex_Node, + args=('{server}%s' % data.get('key'), + data.get('offset'), + True, + False),) + thread.setDaemon(True) + thread.start() + return try: playqueue = self.mgr.playqueue.get_playqueue_from_type( KODI_PLAYLIST_TYPE_FROM_PLEX_TYPE[data['type']]) @@ -126,6 +138,7 @@ class PlexCompanion(Thread): jsonClass, requestMgr, self.player, self.mgr) queue = Queue.Queue(maxsize=100) + self.queue = queue if settings('plexCompanion') == 'true': # Start up httpd diff --git a/resources/lib/clientinfo.py b/resources/lib/clientinfo.py index 8568930c..91cb6b91 100644 --- a/resources/lib/clientinfo.py +++ b/resources/lib/clientinfo.py @@ -39,7 +39,7 @@ def getXArgsDeviceInfo(options=None): 'X-Plex-Product': v.ADDON_NAME, 'X-Plex-Version': v.ADDON_VERSION, 'X-Plex-Client-Identifier': getDeviceId(), - 'X-Plex-Provides': 'client,controller,player', + 'X-Plex-Provides': 'client,controller,player,pubsub-player', } if window('pms_token'): xargs['X-Plex-Token'] = window('pms_token') diff --git a/resources/lib/companion.py b/resources/lib/companion.py new file mode 100644 index 00000000..5791faf1 --- /dev/null +++ b/resources/lib/companion.py @@ -0,0 +1,193 @@ +# -*- coding: utf-8 -*- +import logging +from urlparse import urlparse +from re import compile as re_compile + +from utils import JSONRPC +import plexdb_functions as plexdb +from variables import ALEXA_TO_COMPANION + +############################################################################### + +log = logging.getLogger("PLEX."+__name__) + +REGEX_PLAYQUEUES = re_compile(r'''/playQueues/(\d+)$''') + +############################################################################### + + +def getPlayers(): + info = JSONRPC("Player.GetActivePlayers").execute()['result'] or [] + log.debug('players: %s' % JSONRPC("Player.GetActivePlayers").execute()) + ret = {} + for player in info: + player['playerid'] = int(player['playerid']) + ret[player['type']] = player + return ret + + +def getPlayerIds(): + ret = [] + for player in getPlayers().values(): + ret.append(player['playerid']) + return ret + + +def getPlaylistId(typus): + """ + typus: one of the Kodi types, e.g. audio or video + + Returns None if nothing was found + """ + for playlist in getPlaylists(): + if playlist.get('type') == typus: + return playlist.get('playlistid') + + +def getPlaylists(): + """ + Returns a list, e.g. + [ + {u'playlistid': 0, u'type': u'audio'}, + {u'playlistid': 1, u'type': u'video'}, + {u'playlistid': 2, u'type': u'picture'} + ] + """ + return JSONRPC('Playlist.GetPlaylists').execute() + + +def millisToTime(t): + millis = int(t) + seconds = millis / 1000 + minutes = seconds / 60 + hours = minutes / 60 + seconds = seconds % 60 + minutes = minutes % 60 + millis = millis % 1000 + return {'hours': hours, + 'minutes': minutes, + 'seconds': seconds, + 'milliseconds': millis} + + +def skipTo(self, plexId, typus): + # playlistId = self.getPlaylistId(tryDecode(xbmc_type(typus))) + # playerId = self. + with plexdb.Get_Plex_DB() as plex_db: + plexdb_item = plex_db.getItem_byId(plexId) + try: + dbid = plexdb_item[0] + mediatype = plexdb_item[4] + except TypeError: + log.info('Couldnt find item %s in Kodi db' % plexId) + return + log.debug('plexid: %s, kodi id: %s, type: %s' + % (plexId, dbid, mediatype)) + + +def convert_alexa_to_companion(dictionary): + for key in dictionary: + if key in ALEXA_TO_COMPANION: + dictionary[ALEXA_TO_COMPANION[key]] = dictionary[key] + del dictionary[key] + + +def process_command(request_path, params, queue=None): + """ + queue: Queue() of PlexCompanion.py + """ + if params.get('deviceName') == 'Alexa': + convert_alexa_to_companion(params) + log.debug('Received request_path: %s, params: %s' % (request_path, params)) + if "/playMedia" in request_path: + try: + containerKey = urlparse(params.get('containerKey')).path + except: + containerKey = '' + try: + playQueueID = REGEX_PLAYQUEUES.findall(containerKey)[0] + except IndexError: + playQueueID = '' + # We need to tell service.py + queue.put({ + 'action': 'playlist', + 'data': params + }) + return { + 'lastkey': params['key'], + 'containerKey': containerKey, + 'playQueueID': playQueueID, + } + + elif request_path == "player/playback/setParameters": + if 'volume' in params: + volume = int(params['volume']) + log.debug("Adjusting the volume to %s" % volume) + JSONRPC('Application.SetVolume').execute({"volume": volume}) + + elif request_path == "player/playback/play": + for playerid in getPlayerIds(): + JSONRPC("Player.PlayPause").execute({"playerid": playerid, + "play": True}) + + elif request_path == "player/playback/pause": + for playerid in getPlayerIds(): + JSONRPC("Player.PlayPause").execute({"playerid": playerid, + "play": False}) + + elif request_path == "player/playback/stop": + for playerid in getPlayerIds(): + JSONRPC("Player.Stop").execute({"playerid": playerid}) + + elif request_path == "player/playback/seekTo": + for playerid in getPlayerIds(): + JSONRPC("Player.Seek").execute( + {"playerid": playerid, + "value": millisToTime(params.get('offset', 0))}) + + elif request_path == "player/playback/stepForward": + for playerid in getPlayerIds(): + JSONRPC("Player.Seek").execute({"playerid": playerid, + "value": "smallforward"}) + + elif request_path == "player/playback/stepBack": + for playerid in getPlayerIds(): + JSONRPC("Player.Seek").execute({"playerid": playerid, + "value": "smallbackward"}) + + elif request_path == "player/playback/skipNext": + for playerid in getPlayerIds(): + JSONRPC("Player.GoTo").execute({"playerid": playerid, + "to": "next"}) + + elif request_path == "player/playback/skipPrevious": + for playerid in getPlayerIds(): + JSONRPC("Player.GoTo").execute({"playerid": playerid, + "to": "previous"}) + + elif request_path == "player/playback/skipTo": + skipTo(params.get('key').rsplit('/', 1)[1], params.get('type')) + + elif request_path == "player/navigation/moveUp": + JSONRPC("Input.Up").execute() + + elif request_path == "player/navigation/moveDown": + JSONRPC("Input.Down").execute() + + elif request_path == "player/navigation/moveLeft": + JSONRPC("Input.Left").execute() + + elif request_path == "player/navigation/moveRight": + JSONRPC("Input.Right").execute() + + elif request_path == "player/navigation/select": + JSONRPC("Input.Select").execute() + + elif request_path == "player/navigation/home": + JSONRPC("Input.Home").execute() + + elif request_path == "player/navigation/back": + JSONRPC("Input.Back").execute() + + else: + log.error('Unknown request path: %s' % request_path) diff --git a/resources/lib/entrypoint.py b/resources/lib/entrypoint.py index cd4f70dc..2aa33cd3 100644 --- a/resources/lib/entrypoint.py +++ b/resources/lib/entrypoint.py @@ -14,6 +14,7 @@ from utils import window, settings, language as lang, dialog, tryDecode,\ tryEncode, CatchExceptions, JSONRPC import downloadutils import playbackutils as pbutils +import plexdb_functions as plexdb from PlexFunctions import GetPlexMetadata, GetPlexSectionResults, \ GetMachineIdentifier @@ -96,7 +97,7 @@ def togglePlexTV(): sound=False) -def Plex_Node(url, viewOffset, plex_type, playdirectly=False): +def Plex_Node(url, viewOffset, playdirectly=False, node=True): """ Called only for a SINGLE element for Plex.tv watch later @@ -120,11 +121,25 @@ def Plex_Node(url, viewOffset, plex_type, playdirectly=False): else: window('plex_customplaylist.seektime', value=str(viewOffset)) log.info('Set resume point to %s' % str(viewOffset)) - typus = v.KODI_PLAYLIST_TYPE_FROM_PLEX_TYPE[plex_type] + api = API(xml[0]) + typus = v.KODI_PLAYLIST_TYPE_FROM_PLEX_TYPE[api.getType()] + if node is True: + plex_id = None + kodi_id = 'plexnode' + else: + plex_id = api.getRatingKey() + kodi_id = None + with plexdb.Get_Plex_DB() as plex_db: + plexdb_item = plex_db.getItem_byId(plex_id) + try: + kodi_id = plexdb_item[0] + except TypeError: + log.info('Couldnt find item %s in Kodi db' + % api.getRatingKey()) playqueue = Playqueue().get_playqueue_from_type(typus) result = pbutils.PlaybackUtils(xml, playqueue).play( - None, - kodi_id='plexnode', + plex_id, + kodi_id=kodi_id, plex_lib_UUID=xml.attrib.get('librarySectionUUID')) if result.listitem: listitem = convert_PKC_to_listitem(result.listitem) diff --git a/resources/lib/plexbmchelper/listener.py b/resources/lib/plexbmchelper/listener.py index de63b8cc..f240ea1c 100644 --- a/resources/lib/plexbmchelper/listener.py +++ b/resources/lib/plexbmchelper/listener.py @@ -1,11 +1,13 @@ # -*- coding: utf-8 -*- import logging -import re +from re import sub from SocketServer import ThreadingMixIn from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler from urlparse import urlparse, parse_qs from xbmc import sleep +from companion import process_command +from utils import window from functions import * @@ -19,7 +21,6 @@ log = logging.getLogger("PLEX."+__name__) class MyHandler(BaseHTTPRequestHandler): protocol_version = 'HTTP/1.1' - regex = re.compile(r'''/playQueues/(\d+)$''') def __init__(self, *args, **kwargs): BaseHTTPRequestHandler.__init__(self, *args, **kwargs) @@ -83,11 +84,10 @@ class MyHandler(BaseHTTPRequestHandler): subMgr = self.server.subscriptionManager js = self.server.jsonClass settings = self.server.settings - queue = self.server.queue try: request_path = self.path[1:] - request_path = re.sub(r"\?.*", "", request_path) + request_path = sub(r"\?.*", "", request_path) url = urlparse(self.path) paramarrays = parse_qs(url.query) params = {} @@ -145,9 +145,9 @@ class MyHandler(BaseHTTPRequestHandler): sleep(950) commandID = params.get('commandID', 0) self.response( - re.sub(r"INSERTCOMMANDID", - str(commandID), - subMgr.msg(js.getPlayers())), + sub(r"INSERTCOMMANDID", + str(commandID), + subMgr.msg(js.getPlayers())), { 'X-Plex-Client-Identifier': settings['uuid'], 'Access-Control-Expose-Headers': @@ -160,121 +160,18 @@ class MyHandler(BaseHTTPRequestHandler): uuid = self.headers.get('X-Plex-Client-Identifier', False) \ or self.client_address[0] subMgr.removeSubscriber(uuid) - elif request_path == "player/playback/setParameters": - self.response(getOKMsg(), js.getPlexHeaders()) - if 'volume' in params: - volume = int(params['volume']) - log.debug("adjusting the volume to %s%%" % volume) - js.jsonrpc("Application.SetVolume", - {"volume": volume}) - elif "/playMedia" in request_path: - self.response(getOKMsg(), js.getPlexHeaders()) - offset = params.get('viewOffset', params.get('offset', "0")) - protocol = params.get('protocol', "http") - address = params.get('address', self.client_address[0]) - server = self.getServerByHost(address) - port = params.get('port', server.get('port', '32400')) - try: - containerKey = urlparse(params.get('containerKey')).path - except: - containerKey = '' - try: - playQueueID = self.regex.findall(containerKey)[0] - except IndexError: - playQueueID = '' - # We need to tell service.py - queue.put({ - 'action': 'playlist', - 'data': params - }) - subMgr.lastkey = params['key'] - subMgr.containerKey = containerKey - subMgr.playQueueID = playQueueID - subMgr.server = server.get('server', 'localhost') - subMgr.port = port - subMgr.protocol = protocol - subMgr.notify() - elif request_path == "player/playback/play": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.PlayPause", - {"playerid": playerid, "play": True}) - subMgr.notify() - elif request_path == "player/playback/pause": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.PlayPause", - {"playerid": playerid, "play": False}) - subMgr.notify() - elif request_path == "player/playback/stop": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.Stop", {"playerid": playerid}) - subMgr.notify() - elif request_path == "player/playback/seekTo": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.Seek", - {"playerid": playerid, - "value": millisToTime( - params.get('offset', 0))}) - subMgr.notify() - elif request_path == "player/playback/stepForward": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.Seek", - {"playerid": playerid, - "value": "smallforward"}) - subMgr.notify() - elif request_path == "player/playback/stepBack": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.Seek", - {"playerid": playerid, - "value": "smallbackward"}) - subMgr.notify() - elif request_path == "player/playback/skipNext": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.GoTo", - {"playerid": playerid, - "to": "next"}) - subMgr.notify() - elif request_path == "player/playback/skipPrevious": - self.response(getOKMsg(), js.getPlexHeaders()) - for playerid in js.getPlayerIds(): - js.jsonrpc("Player.GoTo", - {"playerid": playerid, - "to": "previous"}) - subMgr.notify() - elif request_path == "player/playback/skipTo": - js.skipTo(params.get('key').rsplit('/', 1)[1], - params.get('type')) - subMgr.notify() - elif request_path == "player/navigation/moveUp": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Up") - elif request_path == "player/navigation/moveDown": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Down") - elif request_path == "player/navigation/moveLeft": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Left") - elif request_path == "player/navigation/moveRight": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Right") - elif request_path == "player/navigation/select": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Select") - elif request_path == "player/navigation/home": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Home") - elif request_path == "player/navigation/back": - self.response(getOKMsg(), js.getPlexHeaders()) - js.jsonrpc("Input.Back") else: - log.error('Unknown request path: %s' % request_path) - + # Throw it to companion.py + answ = process_command(request_path, params, self.server.queue) + self.response(getOKMsg(), js.getPlexHeaders()) + subMgr.notify() + if answ is not None: + subMgr.lastkey = answ['lastkey'] + subMgr.containerKey = answ['containerKey'] + subMgr.playQueueID = answ['playQueueID'] + subMgr.protocol, subMgr.server, subMgr.port = \ + window('pms_server').split(':', 2) + subMgr.server = subMgr.server.replace('/', '') except: log.error('Error encountered. Traceback:') import traceback diff --git a/resources/lib/variables.py b/resources/lib/variables.py index ff9a1126..28b34278 100644 --- a/resources/lib/variables.py +++ b/resources/lib/variables.py @@ -248,3 +248,16 @@ KODI_SUPPORTED_IMAGES = ( '.pcx', '.tga' ) + + +# Translation table from Alexa websocket commands to Plex Companion +ALEXA_TO_COMPANION = { + 'queryKey': 'key', + 'queryOffset': 'offset', + 'queryMachineIdentifier': 'machineIdentifier', + 'queryProtocol': 'protocol', + 'queryAddress': 'address', + 'queryPort': 'port', + 'queryContainerKey': 'containerKey', + 'queryToken': 'token', +} diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index 312080b5..1d19d021 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -4,6 +4,7 @@ import logging import websocket from json import loads +import xml.etree.ElementTree as etree from threading import Thread from Queue import Queue from ssl import CERT_NONE @@ -12,6 +13,7 @@ from xbmc import sleep from utils import window, settings, ThreadMethodsAdditionalSuspend, \ ThreadMethods +from companion import process_command ############################################################################### @@ -29,10 +31,151 @@ class WebSocket(Thread): if callback is not None: self.mgr = callback self.ws = None - # Communication with librarysync - self.queue = Queue() Thread.__init__(self) + def process(self, opcode, message): + raise NotImplementedError + + def receive(self, ws): + # Not connected yet + if ws is None: + raise websocket.WebSocketConnectionClosedException + + frame = ws.recv_frame() + + if not frame: + raise websocket.WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in self.opcode_data: + return frame.opcode, frame.data + elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: + ws.send_close() + return frame.opcode, None + elif frame.opcode == websocket.ABNF.OPCODE_PING: + ws.pong("Hi!") + return None, None + + def getUri(self): + raise NotImplementedError + + def run(self): + log.info("----===## Starting %s ##===----" % self.__class__.__name__) + + counter = 0 + handshake_counter = 0 + threadStopped = self.threadStopped + threadSuspended = self.threadSuspended + while not threadStopped(): + # In the event the server goes offline + while threadSuspended(): + # Set in service.py + if self.ws is not None: + try: + self.ws.shutdown() + except: + pass + self.ws = None + if threadStopped(): + # Abort was requested while waiting. We should exit + log.info("##===---- %s Stopped ----===##" + % self.__class__.__name__) + return + sleep(1000) + try: + self.process(*self.receive(self.ws)) + except websocket.WebSocketTimeoutException: + # No worries if read timed out + pass + except websocket.WebSocketConnectionClosedException: + log.info("Connection closed, (re)connecting") + uri, sslopt = self.getUri() + try: + # Low timeout - let's us shut this thread down! + self.ws = websocket.create_connection( + uri, + timeout=1, + sslopt=sslopt, + enable_multithread=True) + except IOError: + # Server is probably offline + log.info("Error connecting") + self.ws = None + counter += 1 + if counter > 3: + counter = 0 + self.IOError_response() + sleep(1000) + except websocket.WebSocketTimeoutException: + log.info("timeout while connecting, trying again") + self.ws = None + sleep(1000) + except websocket.WebSocketException as e: + log.info('WebSocketException: %s' % e) + if 'Handshake Status 401' in e.args: + handshake_counter += 1 + if handshake_counter >= 5: + log.info('Error in handshake detected. Stopping ' + '%s now' % self.__class__.__name__) + break + self.ws = None + sleep(1000) + except Exception as e: + log.error("Unknown exception encountered in connecting: %s" + % e) + import traceback + log.error("Traceback:\n%s" % traceback.format_exc()) + self.ws = None + sleep(1000) + else: + counter = 0 + handshake_counter = 0 + except Exception as e: + log.error("Unknown exception encountered: %s" % e) + import traceback + log.error("Traceback:\n%s" % traceback.format_exc()) + try: + self.ws.shutdown() + except: + pass + self.ws = None + log.info("##===---- %s Stopped ----===##" % self.__class__.__name__) + + def stopThread(self): + """ + Overwrite this method from ThreadMethods to close websockets + """ + log.info("Stopping %s thread." % self.__class__.__name__) + self._threadStopped = True + try: + self.ws.shutdown() + except: + pass + + +class PMS_Websocket(WebSocket): + """ + Websocket connection with the PMS for Plex Companion + """ + # Communication with librarysync + queue = Queue() + + def getUri(self): + server = window('pms_server') + # Need to use plex.tv token, if any. NOT user token + token = window('plex_token') + # Get the appropriate prefix for the websocket + if server.startswith('https'): + server = "wss%s" % server[5:] + else: + server = "ws%s" % server[4:] + uri = "%s/:/websockets/notifications" % server + if token: + uri += '?X-Plex-Token=%s' % token + sslopt = {} + if settings('sslverify') == "false": + sslopt["cert_reqs"] = CERT_NONE + log.debug("Uri: %s, sslopt: %s" % (uri, sslopt)) + return uri, sslopt + def process(self, opcode, message): if opcode not in self.opcode_data: return False @@ -62,131 +205,49 @@ class WebSocket(Thread): self.queue.put(message) return True - def receive(self, ws): - # Not connected yet - if ws is None: - raise websocket.WebSocketConnectionClosedException + def IOError_response(self): + log.warn("Repeatedly could not connect to PMS, " + "declaring the connection dead") + window('plex_online', value='false') - frame = ws.recv_frame() - - if not frame: - raise websocket.WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in self.opcode_data: - return frame.opcode, frame.data - elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: - ws.send_close() - return frame.opcode, None - elif frame.opcode == websocket.ABNF.OPCODE_PING: - ws.pong("Hi!") - return None, None +class Alexia_Websocket(WebSocket): + """ + Websocket connection to talk to Amazon Alexia + """ def getUri(self): - server = window('pms_server') - # Need to use plex.tv token, if any. NOT user token - token = window('plex_token') - # Get the appropriate prefix for the websocket - if server.startswith('https'): - server = "wss%s" % server[5:] - else: - server = "ws%s" % server[4:] - uri = "%s/:/websockets/notifications" % server - if token: - uri += '?X-Plex-Token=%s' % token + self.plex_client_Id = window('plex_client_Id') + uri = ('wss://pubsub.plex.tv/sub/websockets/%s/%s?X-Plex-Token=%s' + % (window('currUserId'), + self.plex_client_Id, + window('plex_token'))) sslopt = {} - if settings('sslverify') == "false": - sslopt["cert_reqs"] = CERT_NONE log.debug("Uri: %s, sslopt: %s" % (uri, sslopt)) return uri, sslopt - def run(self): - log.info("----===## Starting WebSocketClient ##===----") - - counter = 0 - handshake_counter = 0 - threadStopped = self.threadStopped - threadSuspended = self.threadSuspended - while not threadStopped(): - # In the event the server goes offline - while threadSuspended(): - # Set in service.py - if self.ws is not None: - try: - self.ws.shutdown() - except: - pass - self.ws = None - if threadStopped(): - # Abort was requested while waiting. We should exit - log.info("##===---- WebSocketClient Stopped ----===##") - return - sleep(1000) - try: - self.process(*self.receive(self.ws)) - except websocket.WebSocketTimeoutException: - # No worries if read timed out - pass - except websocket.WebSocketConnectionClosedException: - log.info("Connection closed, (re)connecting") - uri, sslopt = self.getUri() - try: - # Low timeout - let's us shut this thread down! - self.ws = websocket.create_connection( - uri, - timeout=1, - sslopt=sslopt, - enable_multithread=True) - except IOError: - # Server is probably offline - log.info("Error connecting") - self.ws = None - counter += 1 - if counter > 3: - log.warn("Repeatedly could not connect to PMS, " - "declaring the connection dead") - window('plex_online', value='false') - counter = 0 - sleep(1000) - except websocket.WebSocketTimeoutException: - log.info("timeout while connecting, trying again") - self.ws = None - sleep(1000) - except websocket.WebSocketException as e: - log.info('WebSocketException: %s' % e) - if 'Handshake Status 401' in e.args: - handshake_counter += 1 - if handshake_counter >= 5: - log.info('Error in handshake detected. Stopping ' - 'WebSocketClient now') - break - self.ws = None - sleep(1000) - except Exception as e: - log.error("Unknown exception encountered in connecting: %s" - % e) - import traceback - log.error("Traceback:\n%s" % traceback.format_exc()) - self.ws = None - sleep(1000) - else: - counter = 0 - handshake_counter = 0 - except Exception as e: - log.error("Unknown exception encountered: %s" % e) - try: - self.ws.shutdown() - except: - pass - self.ws = None - - log.info("##===---- WebSocketClient Stopped ----===##") - - def stopThread(self): - """ - Overwrite this method from ThreadMethods to close websockets - """ - log.info("Stopping websocket client thread.") - self._threadStopped = True + def process(self, opcode, message): + if opcode not in self.opcode_data: + return False + log.debug('Received the following message from Alexia:') + log.debug(message) try: - self.ws.shutdown() + message = etree.fromstring(message) + except Exception as ex: + log.error('Error decoding message from Alexa: %s' % ex) + return False + try: + if message.attrib['command'] == 'processRemoteControlCommand': + message = message[0] + else: + log.error('Unknown Alexa message received') + return False except: - pass + log.error('Could not parse Alexia message') + return False + process_command(message.attrib['path'][1:], + message.attrib, + queue=self.mgr.plexCompanion.queue) + return True + + def IOError_response(self): + pass diff --git a/service.py b/service.py index 131d6189..b9afc73d 100644 --- a/service.py +++ b/service.py @@ -36,7 +36,7 @@ import initialsetup from kodimonitor import KodiMonitor from librarysync import LibrarySync import videonodes -from websocket_client import WebSocket +from websocket_client import PMS_Websocket, Alexia_Websocket import downloadutils from playqueue import Playqueue @@ -70,6 +70,7 @@ class Service(): user_running = False ws_running = False + alexia_running = False library_running = False plexCompanion_running = False playqueue_running = False @@ -148,7 +149,8 @@ class Service(): # Initialize important threads, handing over self for callback purposes self.user = UserClient(self) - self.ws = WebSocket(self) + self.ws = PMS_Websocket(self) + self.alexia = Alexia_Websocket(self) self.library = LibrarySync(self) self.plexCompanion = PlexCompanion(self) self.playqueue = Playqueue(self) @@ -201,6 +203,10 @@ class Service(): if not self.ws_running: self.ws_running = True self.ws.start() + # Start the Alexia thread + if not self.alexia_running: + self.alexia_running = True + self.alexia.start() # Start the syncing thread if not self.library_running: self.library_running = True @@ -326,6 +332,10 @@ class Service(): self.ws.stopThread() except: log.warn('Websocket client already shut down') + try: + self.alexia.stopThread() + except: + log.warn('Websocket client already shut down') try: self.user.stopThread() except: