From dc7755ba6da8aa6ed58e6759c58482166383252e Mon Sep 17 00:00:00 2001 From: tomkat83 Date: Tue, 29 Mar 2016 18:44:13 +0200 Subject: [PATCH] Rewired websockets --- resources/lib/PlexCompanion.py | 2 +- resources/lib/websocket.py | 22 +- resources/lib/websocket_client.py | 370 +++++++----------------------- service.py | 7 +- 4 files changed, 95 insertions(+), 306 deletions(-) diff --git a/resources/lib/PlexCompanion.py b/resources/lib/PlexCompanion.py index bd35daa1..1a2b8f3d 100644 --- a/resources/lib/PlexCompanion.py +++ b/resources/lib/PlexCompanion.py @@ -103,4 +103,4 @@ class PlexCompanion(threading.Thread): pass finally: httpd.socket.close() - self.logMsg("----===## STOP Plex Companion ##===----", 0) + self.logMsg("----===## Plex Companion stopped ##===----", 0) diff --git a/resources/lib/websocket.py b/resources/lib/websocket.py index b3352720..3d777a97 100644 --- a/resources/lib/websocket.py +++ b/resources/lib/websocket.py @@ -46,8 +46,6 @@ import logging import traceback import sys -import xbmc - """ websocket python client. ========================= @@ -390,9 +388,6 @@ class WebSocket(object): self._frame_mask = None self._cont_data = None - # Do not allow simultaneous send - leads to SSL issues! - self.lock = threading.Lock() - def fileno(self): return self.sock.fileno() @@ -568,10 +563,9 @@ class WebSocket(object): length = len(data) if traceEnabled: logger.debug("send: " + repr(data)) - with self.lock: - while data: - l = self._send(data) - data = data[l:] + while data: + l = self._send(data) + data = data[l:] return length def send_binary(self, payload): @@ -734,12 +728,9 @@ class WebSocket(object): except socket.timeout as e: raise WebSocketTimeoutException(e.args[0]) except Exception as e: - try: - if "timed out" in e.args[0]: - raise WebSocketTimeoutException(e.args[0]) - else: - raise e - except: + if "timed out" in e.args[0]: + raise WebSocketTimeoutException(e.args[0]) + else: raise e def _recv(self, bufsize): @@ -888,7 +879,6 @@ class WebSocketApp(object): #print str(e.args[0]) if "timed out" not in e.args[0]: raise e - xbmc.sleep(100) except Exception, e: self._callback(self.on_error, e) diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index a4fb49df..d9aaa064 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -9,314 +9,87 @@ import websocket import ssl import xbmc -import xbmcgui -import clientinfo -import downloadutils -import playlist -import userclient import utils -import logging -logging.basicConfig() ############################################################################### +TIMELINE_STATES = { + 0: 'created', + 2: 'matching', + 3: 'downloading', + 4: 'loading', + 5: 'finished', + 6: 'analyzing', + 9: 'deleted' +} + + @utils.logging @utils.ThreadMethods -class WebSocket_Client(threading.Thread): - """ - websocket_client.WebSocket_Client(queue) - - where (communication with librarysync) - queue: Queue object for background sync - """ - _shared_state = {} - - client = None - stopWebsocket = False +class WebSocket(threading.Thread): + opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY) def __init__(self, queue): - - self.__dict__ = self._shared_state - + self.ws = None # Communication with librarysync self.queue = queue - - self.doUtils = downloadutils.DownloadUtils() - self.clientInfo = clientinfo.ClientInfo() - self.deviceId = self.clientInfo.getDeviceId() - threading.Thread.__init__(self) - def sendProgressUpdate(self, data): - log = self.logMsg + def process(self, opcode, message): + if opcode not in self.opcode_data: + return False - log("sendProgressUpdate", 2) - try: - messageData = { - - 'MessageType': "ReportPlaybackProgress", - 'Data': data - } - messageString = json.dumps(messageData) - self.client.send(messageString) - log("Message data: %s" % messageString, 2) - - except Exception as e: - log("Exception: %s" % e, 1) - - def on_message(self, ws, message): - """ - Will be called automatically if ws receives a message from PMS - """ try: message = json.loads(message) - except Exception as e: - self.logMsg('Error decoding message from websocket: %s' % e, -1) + except Exception as ex: + self.logMsg('Error decoding message from websocket: %s' % ex, -1) + self.logMsg(message, -1) return False # Triage - self.logMsg('Message received: %s' % message, 2) typus = message.get('type') if typus is None: self.logMsg('No message type, dropping message: %s' % message, -1) return False # Drop everything we're not interested in if typus not in ('playing', 'timeline'): - return + return True + # Put PMS message on queue and let libsync take care of it try: self.queue.put(message) return True except Queue.Full: - # Queue only takes 100 messages. No worries if we miss one or two - self.logMsg('Queue is full, dropping PMS message', 0) + # Queue only takes 200 messages. No worries if we miss one or two + self.logMsg('Queue is full, dropping PMS message %s' % message, 0) return False - def on_message_LEGACY_EMBY(self, ws, message): - - log = self.logMsg - window = utils.window - lang = utils.language + def receive(self, ws): + # Not connected yet + if ws is None: + raise websocket.WebSocketConnectionClosedException - result = json.loads(message) - messageType = result['MessageType'] - data = result['Data'] + frame = ws.recv_frame() - if messageType not in ('SessionEnded'): - # Mute certain events - log("Message: %s" % message, 1) + if not frame: + raise websocket.WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in self.opcode_data: + return frame.opcode, frame.data + elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: + ws.send_close() + return frame.opcode, None + elif frame.opcode == websocket.ABNF.OPCODE_PING: + ws.pong("Hi!") - if messageType == "Play": - # A remote control play command has been sent from the server. - itemIds = data['ItemIds'] - command = data['PlayCommand'] + return None, None - pl = playlist.Playlist() - dialog = xbmcgui.Dialog() - - if command == "PlayNow": - dialog.notification( - heading="Emby for Kodi", - message="%s %s" % (len(itemIds), lang(33004)), - icon="special://home/addons/plugin.video.emby/icon.png", - sound=False) - startat = data.get('StartPositionTicks', 0) - pl.playAll(itemIds, startat) - - elif command == "PlayNext": - dialog.notification( - heading="Emby for Kodi", - message="%s %s" % (len(itemIds), lang(33005)), - icon="special://home/addons/plugin.video.emby/icon.png", - sound=False) - newplaylist = pl.modifyPlaylist(itemIds) - player = xbmc.Player() - if not player.isPlaying(): - # Only start the playlist if nothing is playing - player.play(newplaylist) - - elif messageType == "Playstate": - # A remote control update playstate command has been sent from the server. - command = data['Command'] - player = xbmc.Player() - - actions = { - - 'Stop': player.stop, - 'Unpause': player.pause, - 'Pause': player.pause, - 'NextTrack': player.playnext, - 'PreviousTrack': player.playprevious, - 'Seek': player.seekTime - } - action = actions[command] - if command == "Seek": - seekto = data['SeekPositionTicks'] - seektime = seekto / 10000000.0 - action(seektime) - log("Seek to %s." % seektime, 1) - else: - action() - log("Command: %s completed." % command, 1) - - window('emby_command', value="true") - - elif messageType == "UserDataChanged": - # A user changed their personal rating for an item, or their playstate was updated - userdata_list = data['UserDataList'] - self.librarySync.triage_items("userdata", userdata_list) - - elif messageType == "LibraryChanged": - - librarySync = self.librarySync - processlist = { - - 'added': data['ItemsAdded'], - 'update': data['ItemsUpdated'], - 'remove': data['ItemsRemoved'] - } - for action in processlist: - librarySync.triage_items(action, processlist[action]) - - elif messageType == "GeneralCommand": - - command = data['Name'] - arguments = data['Arguments'] - - if command in ('Mute', 'Unmute', 'SetVolume', - 'SetSubtitleStreamIndex', 'SetAudioStreamIndex'): - - player = xbmc.Player() - # These commands need to be reported back - if command == "Mute": - xbmc.executebuiltin('Mute') - elif command == "Unmute": - xbmc.executebuiltin('Mute') - elif command == "SetVolume": - volume = arguments['Volume'] - xbmc.executebuiltin('SetVolume(%s[,showvolumebar])' % volume) - elif command == "SetAudioStreamIndex": - index = int(arguments['Index']) - player.setAudioStream(index - 1) - elif command == "SetSubtitleStreamIndex": - embyindex = int(arguments['Index']) - currentFile = player.getPlayingFile() - - mapping = window('emby_%s.indexMapping' % currentFile) - if mapping: - externalIndex = json.loads(mapping) - # If there's external subtitles added via playbackutils - for index in externalIndex: - if externalIndex[index] == embyindex: - player.setSubtitleStream(int(index)) - break - else: - # User selected internal subtitles - external = len(externalIndex) - audioTracks = len(player.getAvailableAudioStreams()) - player.setSubtitleStream(external + embyindex - audioTracks - 1) - else: - # Emby merges audio and subtitle index together - audioTracks = len(player.getAvailableAudioStreams()) - player.setSubtitleStream(index - audioTracks - 1) - - # Let service know - window('emby_command', value="true") - - elif command == "DisplayMessage": - - header = arguments['Header'] - text = arguments['Text'] - xbmcgui.Dialog().notification( - heading=header, - message=text, - icon="special://home/addons/plugin.video.emby/icon.png", - time=4000) - - elif command == "SendString": - - string = arguments['String'] - text = { - - 'jsonrpc': "2.0", - 'id': 0, - 'method': "Input.SendText", - 'params': { - - 'text': "%s" % string, - 'done': False - } - } - result = xbmc.executeJSONRPC(json.dumps(text)) - - else: - builtin = { - - 'ToggleFullscreen': 'Action(FullScreen)', - 'ToggleOsdMenu': 'Action(OSD)', - 'ToggleContextMenu': 'Action(ContextMenu)', - 'MoveUp': 'Action(Up)', - 'MoveDown': 'Action(Down)', - 'MoveLeft': 'Action(Left)', - 'MoveRight': 'Action(Right)', - 'Select': 'Action(Select)', - 'Back': 'Action(back)', - 'GoHome': 'ActivateWindow(Home)', - 'PageUp': 'Action(PageUp)', - 'NextLetter': 'Action(NextLetter)', - 'GoToSearch': 'VideoLibrary.Search', - 'GoToSettings': 'ActivateWindow(Settings)', - 'PageDown': 'Action(PageDown)', - 'PreviousLetter': 'Action(PrevLetter)', - 'TakeScreenshot': 'TakeScreenshot', - 'ToggleMute': 'Mute', - 'VolumeUp': 'Action(VolumeUp)', - 'VolumeDown': 'Action(VolumeDown)', - } - action = builtin.get(command) - if action: - xbmc.executebuiltin(action) - - elif messageType == "ServerRestarting": - if utils.settings('supressRestartMsg') == "true": - xbmcgui.Dialog().notification( - heading="Emby for Kodi", - message=lang(33006), - icon="special://home/addons/plugin.video.emby/icon.png") - - elif messageType == "UserConfigurationUpdated": - # Update user data set in userclient - userclient.UserClient().userSettings = data - self.librarySync.refresh_views = True - - def on_close(self, ws): - self.logMsg("Closed.", 2) - - def on_open(self, ws): - return - # Can we post something to Plex? - self.doUtils.postCapabilities(self.deviceId) - - def on_error(self, ws, error): - if "10061" in str(error): - # Server is offline - pass - else: - self.logMsg("Error: %s" % error, 2) - - def run(self): - - log = self.logMsg - window = utils.window - - # websocket.enableTrace(True) - - server = window('pms_server') + def getUri(self): + server = utils.window('pms_server') # Need to use plex.tv token, if any. NOT user token - token = window('plex_token') + token = utils.window('plex_token') # Get the appropriate prefix for the websocket if "https" in server: @@ -324,35 +97,62 @@ class WebSocket_Client(threading.Thread): else: server = server.replace('http', "ws") - websocket_url = "%s/:/websockets/notifications" % server + uri = "%s/:/websockets/notifications" % server if token: - websocket_url += '?X-Plex-Token=%s' % token - log("websocket url: %s" % websocket_url, 1) + uri += '?X-Plex-Token=%s' % token + return uri + def run(self): + log = self.logMsg + # Currently not working due to missing SSL environment sslopt = {} if utils.settings('sslverify') == "false": sslopt["cert_reqs"] = ssl.CERT_NONE - self.client = websocket.WebSocketApp( - websocket_url, - on_message=self.on_message, - on_error=self.on_error, - on_close=self.on_close) - - self.client.on_open = self.on_open log("----===## Starting WebSocketClient ##===----", 0) - while not self.threadStopped(): - self.client.run_forever(ping_interval=10, - sslopt=sslopt) - xbmc.sleep(100) + threadStopped = self.threadStopped + threadSuspended = self.threadSuspended + while not threadStopped(): + # In the event the server goes offline + while threadSuspended(): + # Set in service.py + if threadStopped(): + # Abort was requested while waiting. We should exit + log("##===---- WebSocketClient Stopped ----===##", 0) + return + xbmc.sleep(1000) + try: + self.process(*self.receive(self.ws)) + except websocket.WebSocketTimeoutException: + # No worries if read timed out + pass + except websocket.WebSocketConnectionClosedException: + log("Connection closed, (re)connecting", 0) + uri = self.getUri() + try: + # Low timeout - let's us shut this thread down! + self.ws = websocket.create_connection( + uri, + timeout=1, + sslopt=sslopt, + enable_multithread=True) + except (IOError): + log("Error connecting", 0) + xbmc.sleep(1000) + except Exception as e: + log("Unknown exception encountered: %s" % e) + pass log("##===---- WebSocketClient Stopped ----===##", 0) def stopThread(self): """ - Overwrite this method from ThreadMethods to close websockets first + Overwrite this method from ThreadMethods to close websockets """ self.logMsg("Stopping websocket client thread.", 1) - self.client.close() self._threadStopped = True + try: + self.ws.shutdown() + except: + pass diff --git a/service.py b/service.py index e88e47d0..8d27bd87 100644 --- a/service.py +++ b/service.py @@ -111,12 +111,12 @@ class Service(): # Server auto-detect initialsetup.InitialSetup().setup() - # Queue and lock for background sync - queue = Queue.LifoQueue(maxsize=100) + # Queue for background sync + queue = Queue.Queue(maxsize=200) # Initialize important threads user = userclient.UserClient() - ws = wsc.WebSocket_Client(queue) + ws = wsc.WebSocket(queue) library = librarysync.LibrarySync(queue) kplayer = player.Player() xplayer = xbmc.Player() @@ -197,7 +197,6 @@ class Service(): ws.start() # Start the syncing thread if not self.library_running: - log('Starting libary sync thread', 1) self.library_running = True library.start() # Start the Plex Companion thread