Rewired websockets

This commit is contained in:
tomkat83 2016-03-29 18:44:13 +02:00
parent c401cd9835
commit dc7755ba6d
4 changed files with 95 additions and 306 deletions

View file

@ -103,4 +103,4 @@ class PlexCompanion(threading.Thread):
pass pass
finally: finally:
httpd.socket.close() httpd.socket.close()
self.logMsg("----===## STOP Plex Companion ##===----", 0) self.logMsg("----===## Plex Companion stopped ##===----", 0)

View file

@ -46,8 +46,6 @@ import logging
import traceback import traceback
import sys import sys
import xbmc
""" """
websocket python client. websocket python client.
========================= =========================
@ -390,9 +388,6 @@ class WebSocket(object):
self._frame_mask = None self._frame_mask = None
self._cont_data = None self._cont_data = None
# Do not allow simultaneous send - leads to SSL issues!
self.lock = threading.Lock()
def fileno(self): def fileno(self):
return self.sock.fileno() return self.sock.fileno()
@ -568,10 +563,9 @@ class WebSocket(object):
length = len(data) length = len(data)
if traceEnabled: if traceEnabled:
logger.debug("send: " + repr(data)) logger.debug("send: " + repr(data))
with self.lock: while data:
while data: l = self._send(data)
l = self._send(data) data = data[l:]
data = data[l:]
return length return length
def send_binary(self, payload): def send_binary(self, payload):
@ -734,12 +728,9 @@ class WebSocket(object):
except socket.timeout as e: except socket.timeout as e:
raise WebSocketTimeoutException(e.args[0]) raise WebSocketTimeoutException(e.args[0])
except Exception as e: except Exception as e:
try: if "timed out" in e.args[0]:
if "timed out" in e.args[0]: raise WebSocketTimeoutException(e.args[0])
raise WebSocketTimeoutException(e.args[0]) else:
else:
raise e
except:
raise e raise e
def _recv(self, bufsize): def _recv(self, bufsize):
@ -888,7 +879,6 @@ class WebSocketApp(object):
#print str(e.args[0]) #print str(e.args[0])
if "timed out" not in e.args[0]: if "timed out" not in e.args[0]:
raise e raise e
xbmc.sleep(100)
except Exception, e: except Exception, e:
self._callback(self.on_error, e) self._callback(self.on_error, e)

View file

@ -9,314 +9,87 @@ import websocket
import ssl import ssl
import xbmc import xbmc
import xbmcgui
import clientinfo
import downloadutils
import playlist
import userclient
import utils import utils
import logging
logging.basicConfig()
############################################################################### ###############################################################################
TIMELINE_STATES = {
0: 'created',
2: 'matching',
3: 'downloading',
4: 'loading',
5: 'finished',
6: 'analyzing',
9: 'deleted'
}
@utils.logging @utils.logging
@utils.ThreadMethods @utils.ThreadMethods
class WebSocket_Client(threading.Thread): class WebSocket(threading.Thread):
""" opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
websocket_client.WebSocket_Client(queue)
where (communication with librarysync)
queue: Queue object for background sync
"""
_shared_state = {}
client = None
stopWebsocket = False
def __init__(self, queue): def __init__(self, queue):
self.ws = None
self.__dict__ = self._shared_state
# Communication with librarysync # Communication with librarysync
self.queue = queue self.queue = queue
self.doUtils = downloadutils.DownloadUtils()
self.clientInfo = clientinfo.ClientInfo()
self.deviceId = self.clientInfo.getDeviceId()
threading.Thread.__init__(self) threading.Thread.__init__(self)
def sendProgressUpdate(self, data): def process(self, opcode, message):
log = self.logMsg if opcode not in self.opcode_data:
return False
log("sendProgressUpdate", 2)
try:
messageData = {
'MessageType': "ReportPlaybackProgress",
'Data': data
}
messageString = json.dumps(messageData)
self.client.send(messageString)
log("Message data: %s" % messageString, 2)
except Exception as e:
log("Exception: %s" % e, 1)
def on_message(self, ws, message):
"""
Will be called automatically if ws receives a message from PMS
"""
try: try:
message = json.loads(message) message = json.loads(message)
except Exception as e: except Exception as ex:
self.logMsg('Error decoding message from websocket: %s' % e, -1) self.logMsg('Error decoding message from websocket: %s' % ex, -1)
self.logMsg(message, -1)
return False return False
# Triage # Triage
self.logMsg('Message received: %s' % message, 2)
typus = message.get('type') typus = message.get('type')
if typus is None: if typus is None:
self.logMsg('No message type, dropping message: %s' % message, -1) self.logMsg('No message type, dropping message: %s' % message, -1)
return False return False
# Drop everything we're not interested in # Drop everything we're not interested in
if typus not in ('playing', 'timeline'): if typus not in ('playing', 'timeline'):
return return True
# Put PMS message on queue and let libsync take care of it # Put PMS message on queue and let libsync take care of it
try: try:
self.queue.put(message) self.queue.put(message)
return True return True
except Queue.Full: except Queue.Full:
# Queue only takes 100 messages. No worries if we miss one or two # Queue only takes 200 messages. No worries if we miss one or two
self.logMsg('Queue is full, dropping PMS message', 0) self.logMsg('Queue is full, dropping PMS message %s' % message, 0)
return False return False
def on_message_LEGACY_EMBY(self, ws, message): def receive(self, ws):
# Not connected yet
log = self.logMsg if ws is None:
window = utils.window raise websocket.WebSocketConnectionClosedException
lang = utils.language
result = json.loads(message) frame = ws.recv_frame()
messageType = result['MessageType']
data = result['Data']
if messageType not in ('SessionEnded'): if not frame:
# Mute certain events raise websocket.WebSocketException("Not a valid frame %s" % frame)
log("Message: %s" % message, 1) elif frame.opcode in self.opcode_data:
return frame.opcode, frame.data
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
ws.send_close()
return frame.opcode, None
elif frame.opcode == websocket.ABNF.OPCODE_PING:
ws.pong("Hi!")
if messageType == "Play": return None, None
# A remote control play command has been sent from the server.
itemIds = data['ItemIds']
command = data['PlayCommand']
pl = playlist.Playlist() def getUri(self):
dialog = xbmcgui.Dialog() server = utils.window('pms_server')
if command == "PlayNow":
dialog.notification(
heading="Emby for Kodi",
message="%s %s" % (len(itemIds), lang(33004)),
icon="special://home/addons/plugin.video.emby/icon.png",
sound=False)
startat = data.get('StartPositionTicks', 0)
pl.playAll(itemIds, startat)
elif command == "PlayNext":
dialog.notification(
heading="Emby for Kodi",
message="%s %s" % (len(itemIds), lang(33005)),
icon="special://home/addons/plugin.video.emby/icon.png",
sound=False)
newplaylist = pl.modifyPlaylist(itemIds)
player = xbmc.Player()
if not player.isPlaying():
# Only start the playlist if nothing is playing
player.play(newplaylist)
elif messageType == "Playstate":
# A remote control update playstate command has been sent from the server.
command = data['Command']
player = xbmc.Player()
actions = {
'Stop': player.stop,
'Unpause': player.pause,
'Pause': player.pause,
'NextTrack': player.playnext,
'PreviousTrack': player.playprevious,
'Seek': player.seekTime
}
action = actions[command]
if command == "Seek":
seekto = data['SeekPositionTicks']
seektime = seekto / 10000000.0
action(seektime)
log("Seek to %s." % seektime, 1)
else:
action()
log("Command: %s completed." % command, 1)
window('emby_command', value="true")
elif messageType == "UserDataChanged":
# A user changed their personal rating for an item, or their playstate was updated
userdata_list = data['UserDataList']
self.librarySync.triage_items("userdata", userdata_list)
elif messageType == "LibraryChanged":
librarySync = self.librarySync
processlist = {
'added': data['ItemsAdded'],
'update': data['ItemsUpdated'],
'remove': data['ItemsRemoved']
}
for action in processlist:
librarySync.triage_items(action, processlist[action])
elif messageType == "GeneralCommand":
command = data['Name']
arguments = data['Arguments']
if command in ('Mute', 'Unmute', 'SetVolume',
'SetSubtitleStreamIndex', 'SetAudioStreamIndex'):
player = xbmc.Player()
# These commands need to be reported back
if command == "Mute":
xbmc.executebuiltin('Mute')
elif command == "Unmute":
xbmc.executebuiltin('Mute')
elif command == "SetVolume":
volume = arguments['Volume']
xbmc.executebuiltin('SetVolume(%s[,showvolumebar])' % volume)
elif command == "SetAudioStreamIndex":
index = int(arguments['Index'])
player.setAudioStream(index - 1)
elif command == "SetSubtitleStreamIndex":
embyindex = int(arguments['Index'])
currentFile = player.getPlayingFile()
mapping = window('emby_%s.indexMapping' % currentFile)
if mapping:
externalIndex = json.loads(mapping)
# If there's external subtitles added via playbackutils
for index in externalIndex:
if externalIndex[index] == embyindex:
player.setSubtitleStream(int(index))
break
else:
# User selected internal subtitles
external = len(externalIndex)
audioTracks = len(player.getAvailableAudioStreams())
player.setSubtitleStream(external + embyindex - audioTracks - 1)
else:
# Emby merges audio and subtitle index together
audioTracks = len(player.getAvailableAudioStreams())
player.setSubtitleStream(index - audioTracks - 1)
# Let service know
window('emby_command', value="true")
elif command == "DisplayMessage":
header = arguments['Header']
text = arguments['Text']
xbmcgui.Dialog().notification(
heading=header,
message=text,
icon="special://home/addons/plugin.video.emby/icon.png",
time=4000)
elif command == "SendString":
string = arguments['String']
text = {
'jsonrpc': "2.0",
'id': 0,
'method': "Input.SendText",
'params': {
'text': "%s" % string,
'done': False
}
}
result = xbmc.executeJSONRPC(json.dumps(text))
else:
builtin = {
'ToggleFullscreen': 'Action(FullScreen)',
'ToggleOsdMenu': 'Action(OSD)',
'ToggleContextMenu': 'Action(ContextMenu)',
'MoveUp': 'Action(Up)',
'MoveDown': 'Action(Down)',
'MoveLeft': 'Action(Left)',
'MoveRight': 'Action(Right)',
'Select': 'Action(Select)',
'Back': 'Action(back)',
'GoHome': 'ActivateWindow(Home)',
'PageUp': 'Action(PageUp)',
'NextLetter': 'Action(NextLetter)',
'GoToSearch': 'VideoLibrary.Search',
'GoToSettings': 'ActivateWindow(Settings)',
'PageDown': 'Action(PageDown)',
'PreviousLetter': 'Action(PrevLetter)',
'TakeScreenshot': 'TakeScreenshot',
'ToggleMute': 'Mute',
'VolumeUp': 'Action(VolumeUp)',
'VolumeDown': 'Action(VolumeDown)',
}
action = builtin.get(command)
if action:
xbmc.executebuiltin(action)
elif messageType == "ServerRestarting":
if utils.settings('supressRestartMsg') == "true":
xbmcgui.Dialog().notification(
heading="Emby for Kodi",
message=lang(33006),
icon="special://home/addons/plugin.video.emby/icon.png")
elif messageType == "UserConfigurationUpdated":
# Update user data set in userclient
userclient.UserClient().userSettings = data
self.librarySync.refresh_views = True
def on_close(self, ws):
self.logMsg("Closed.", 2)
def on_open(self, ws):
return
# Can we post something to Plex?
self.doUtils.postCapabilities(self.deviceId)
def on_error(self, ws, error):
if "10061" in str(error):
# Server is offline
pass
else:
self.logMsg("Error: %s" % error, 2)
def run(self):
log = self.logMsg
window = utils.window
# websocket.enableTrace(True)
server = window('pms_server')
# Need to use plex.tv token, if any. NOT user token # Need to use plex.tv token, if any. NOT user token
token = window('plex_token') token = utils.window('plex_token')
# Get the appropriate prefix for the websocket # Get the appropriate prefix for the websocket
if "https" in server: if "https" in server:
@ -324,35 +97,62 @@ class WebSocket_Client(threading.Thread):
else: else:
server = server.replace('http', "ws") server = server.replace('http', "ws")
websocket_url = "%s/:/websockets/notifications" % server uri = "%s/:/websockets/notifications" % server
if token: if token:
websocket_url += '?X-Plex-Token=%s' % token uri += '?X-Plex-Token=%s' % token
log("websocket url: %s" % websocket_url, 1) return uri
def run(self):
log = self.logMsg
# Currently not working due to missing SSL environment
sslopt = {} sslopt = {}
if utils.settings('sslverify') == "false": if utils.settings('sslverify') == "false":
sslopt["cert_reqs"] = ssl.CERT_NONE sslopt["cert_reqs"] = ssl.CERT_NONE
self.client = websocket.WebSocketApp(
websocket_url,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close)
self.client.on_open = self.on_open
log("----===## Starting WebSocketClient ##===----", 0) log("----===## Starting WebSocketClient ##===----", 0)
while not self.threadStopped(): threadStopped = self.threadStopped
self.client.run_forever(ping_interval=10, threadSuspended = self.threadSuspended
sslopt=sslopt) while not threadStopped():
xbmc.sleep(100) # In the event the server goes offline
while threadSuspended():
# Set in service.py
if threadStopped():
# Abort was requested while waiting. We should exit
log("##===---- WebSocketClient Stopped ----===##", 0)
return
xbmc.sleep(1000)
try:
self.process(*self.receive(self.ws))
except websocket.WebSocketTimeoutException:
# No worries if read timed out
pass
except websocket.WebSocketConnectionClosedException:
log("Connection closed, (re)connecting", 0)
uri = self.getUri()
try:
# Low timeout - let's us shut this thread down!
self.ws = websocket.create_connection(
uri,
timeout=1,
sslopt=sslopt,
enable_multithread=True)
except (IOError):
log("Error connecting", 0)
xbmc.sleep(1000)
except Exception as e:
log("Unknown exception encountered: %s" % e)
pass
log("##===---- WebSocketClient Stopped ----===##", 0) log("##===---- WebSocketClient Stopped ----===##", 0)
def stopThread(self): def stopThread(self):
""" """
Overwrite this method from ThreadMethods to close websockets first Overwrite this method from ThreadMethods to close websockets
""" """
self.logMsg("Stopping websocket client thread.", 1) self.logMsg("Stopping websocket client thread.", 1)
self.client.close()
self._threadStopped = True self._threadStopped = True
try:
self.ws.shutdown()
except:
pass

View file

@ -111,12 +111,12 @@ class Service():
# Server auto-detect # Server auto-detect
initialsetup.InitialSetup().setup() initialsetup.InitialSetup().setup()
# Queue and lock for background sync # Queue for background sync
queue = Queue.LifoQueue(maxsize=100) queue = Queue.Queue(maxsize=200)
# Initialize important threads # Initialize important threads
user = userclient.UserClient() user = userclient.UserClient()
ws = wsc.WebSocket_Client(queue) ws = wsc.WebSocket(queue)
library = librarysync.LibrarySync(queue) library = librarysync.LibrarySync(queue)
kplayer = player.Player() kplayer = player.Player()
xplayer = xbmc.Player() xplayer = xbmc.Player()
@ -197,7 +197,6 @@ class Service():
ws.start() ws.start()
# Start the syncing thread # Start the syncing thread
if not self.library_running: if not self.library_running:
log('Starting libary sync thread', 1)
self.library_running = True self.library_running = True
library.start() library.start()
# Start the Plex Companion thread # Start the Plex Companion thread