diff --git a/resources/lib/websocket.py b/resources/lib/websocket.py new file mode 100644 index 00000000..3d777a97 --- /dev/null +++ b/resources/lib/websocket.py @@ -0,0 +1,912 @@ +""" +websocket - WebSocket client library for Python + +Copyright (C) 2010 Hiroki Ohtani(liris) + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +""" + + +import socket + +try: + import ssl + from ssl import SSLError + HAVE_SSL = True +except ImportError: + # dummy class of SSLError for ssl none-support environment. + class SSLError(Exception): + pass + + HAVE_SSL = False + +from urlparse import urlparse +import os +import array +import struct +import uuid +import hashlib +import base64 +import threading +import time +import logging +import traceback +import sys + +""" +websocket python client. +========================= + +This version support only hybi-13. +Please see http://tools.ietf.org/html/rfc6455 for protocol. +""" + + +# websocket supported version. +VERSION = 13 + +# closing frame status codes. +STATUS_NORMAL = 1000 +STATUS_GOING_AWAY = 1001 +STATUS_PROTOCOL_ERROR = 1002 +STATUS_UNSUPPORTED_DATA_TYPE = 1003 +STATUS_STATUS_NOT_AVAILABLE = 1005 +STATUS_ABNORMAL_CLOSED = 1006 +STATUS_INVALID_PAYLOAD = 1007 +STATUS_POLICY_VIOLATION = 1008 +STATUS_MESSAGE_TOO_BIG = 1009 +STATUS_INVALID_EXTENSION = 1010 +STATUS_UNEXPECTED_CONDITION = 1011 +STATUS_TLS_HANDSHAKE_ERROR = 1015 + +logger = logging.getLogger() + + +class WebSocketException(Exception): + """ + websocket exeception class. + """ + pass + + +class WebSocketConnectionClosedException(WebSocketException): + """ + If remote host closed the connection or some network error happened, + this exception will be raised. + """ + pass + +class WebSocketTimeoutException(WebSocketException): + """ + WebSocketTimeoutException will be raised at socket timeout during read/write data. + """ + pass + +default_timeout = None +traceEnabled = False + + +def enableTrace(tracable): + """ + turn on/off the tracability. + + tracable: boolean value. if set True, tracability is enabled. + """ + global traceEnabled + traceEnabled = tracable + if tracable: + if not logger.handlers: + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) + + +def setdefaulttimeout(timeout): + """ + Set the global timeout setting to connect. + + timeout: default socket timeout time. This value is second. + """ + global default_timeout + default_timeout = timeout + + +def getdefaulttimeout(): + """ + Return the global timeout setting(second) to connect. + """ + return default_timeout + + +def _parse_url(url): + """ + parse url and the result is tuple of + (hostname, port, resource path and the flag of secure mode) + + url: url string. + """ + if ":" not in url: + raise ValueError("url is invalid") + + scheme, url = url.split(":", 1) + + parsed = urlparse(url, scheme="http") + if parsed.hostname: + hostname = parsed.hostname + else: + raise ValueError("hostname is invalid") + port = 0 + if parsed.port: + port = parsed.port + + is_secure = False + if scheme == "ws": + if not port: + port = 80 + elif scheme == "wss": + is_secure = True + if not port: + port = 443 + else: + raise ValueError("scheme %s is invalid" % scheme) + + if parsed.path: + resource = parsed.path + else: + resource = "/" + + if parsed.query: + resource += "?" + parsed.query + + return (hostname, port, resource, is_secure) + + +def create_connection(url, timeout=None, **options): + """ + connect to url and return websocket object. + + Connect to url and return the WebSocket object. + Passing optional timeout parameter will set the timeout on the socket. + If no timeout is supplied, the global default timeout setting returned by getdefauttimeout() is used. + You can customize using 'options'. + If you set "header" list object, you can set your own custom header. + + >>> conn = create_connection("ws://echo.websocket.org/", + ... header=["User-Agent: MyProgram", + ... "x-custom: header"]) + + + timeout: socket timeout time. This value is integer. + if you set None for this value, it means "use default_timeout value" + + options: current support option is only "header". + if you set header as dict value, the custom HTTP headers are added. + """ + sockopt = options.get("sockopt", []) + sslopt = options.get("sslopt", {}) + websock = WebSocket(sockopt=sockopt, sslopt=sslopt) + websock.settimeout(timeout if timeout is not None else default_timeout) + websock.connect(url, **options) + return websock + +_MAX_INTEGER = (1 << 32) -1 +_AVAILABLE_KEY_CHARS = range(0x21, 0x2f + 1) + range(0x3a, 0x7e + 1) +_MAX_CHAR_BYTE = (1<<8) -1 + +# ref. Websocket gets an update, and it breaks stuff. +# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html + + +def _create_sec_websocket_key(): + uid = uuid.uuid4() + return base64.encodestring(uid.bytes).strip() + + +_HEADERS_TO_CHECK = { + "upgrade": "websocket", + "connection": "upgrade", + } + + +class ABNF(object): + """ + ABNF frame class. + see http://tools.ietf.org/html/rfc5234 + and http://tools.ietf.org/html/rfc6455#section-5.2 + """ + + # operation code values. + OPCODE_CONT = 0x0 + OPCODE_TEXT = 0x1 + OPCODE_BINARY = 0x2 + OPCODE_CLOSE = 0x8 + OPCODE_PING = 0x9 + OPCODE_PONG = 0xa + + # available operation code value tuple + OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, + OPCODE_PING, OPCODE_PONG) + + # opcode human readable string + OPCODE_MAP = { + OPCODE_CONT: "cont", + OPCODE_TEXT: "text", + OPCODE_BINARY: "binary", + OPCODE_CLOSE: "close", + OPCODE_PING: "ping", + OPCODE_PONG: "pong" + } + + # data length threashold. + LENGTH_7 = 0x7d + LENGTH_16 = 1 << 16 + LENGTH_63 = 1 << 63 + + def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0, + opcode=OPCODE_TEXT, mask=1, data=""): + """ + Constructor for ABNF. + please check RFC for arguments. + """ + self.fin = fin + self.rsv1 = rsv1 + self.rsv2 = rsv2 + self.rsv3 = rsv3 + self.opcode = opcode + self.mask = mask + self.data = data + self.get_mask_key = os.urandom + + def __str__(self): + return "fin=" + str(self.fin) \ + + " opcode=" + str(self.opcode) \ + + " data=" + str(self.data) + + @staticmethod + def create_frame(data, opcode): + """ + create frame to send text, binary and other data. + + data: data to send. This is string value(byte array). + if opcode is OPCODE_TEXT and this value is uniocde, + data value is conveted into unicode string, automatically. + + opcode: operation code. please see OPCODE_XXX. + """ + if opcode == ABNF.OPCODE_TEXT and isinstance(data, unicode): + data = data.encode("utf-8") + # mask must be set if send data from client + return ABNF(1, 0, 0, 0, opcode, 1, data) + + def format(self): + """ + format this object to string(byte array) to send data to server. + """ + if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): + raise ValueError("not 0 or 1") + if self.opcode not in ABNF.OPCODES: + raise ValueError("Invalid OPCODE") + length = len(self.data) + if length >= ABNF.LENGTH_63: + raise ValueError("data is too long") + + frame_header = chr(self.fin << 7 + | self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 + | self.opcode) + if length < ABNF.LENGTH_7: + frame_header += chr(self.mask << 7 | length) + elif length < ABNF.LENGTH_16: + frame_header += chr(self.mask << 7 | 0x7e) + frame_header += struct.pack("!H", length) + else: + frame_header += chr(self.mask << 7 | 0x7f) + frame_header += struct.pack("!Q", length) + + if not self.mask: + return frame_header + self.data + else: + mask_key = self.get_mask_key(4) + return frame_header + self._get_masked(mask_key) + + def _get_masked(self, mask_key): + s = ABNF.mask(mask_key, self.data) + return mask_key + "".join(s) + + @staticmethod + def mask(mask_key, data): + """ + mask or unmask data. Just do xor for each byte + + mask_key: 4 byte string(byte). + + data: data to mask/unmask. + """ + _m = array.array("B", mask_key) + _d = array.array("B", data) + for i in xrange(len(_d)): + _d[i] ^= _m[i % 4] + return _d.tostring() + + +class WebSocket(object): + """ + Low level WebSocket interface. + This class is based on + The WebSocket protocol draft-hixie-thewebsocketprotocol-76 + http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 + + We can connect to the websocket server and send/recieve data. + The following example is a echo client. + + >>> import websocket + >>> ws = websocket.WebSocket() + >>> ws.connect("ws://echo.websocket.org") + >>> ws.send("Hello, Server") + >>> ws.recv() + 'Hello, Server' + >>> ws.close() + + get_mask_key: a callable to produce new mask keys, see the set_mask_key + function's docstring for more details + sockopt: values for socket.setsockopt. + sockopt must be tuple and each element is argument of sock.setscokopt. + sslopt: dict object for ssl socket option. + """ + + def __init__(self, get_mask_key=None, sockopt=None, sslopt=None): + """ + Initalize WebSocket object. + """ + if sockopt is None: + sockopt = [] + if sslopt is None: + sslopt = {} + self.connected = False + self.sock = socket.socket() + for opts in sockopt: + self.sock.setsockopt(*opts) + self.sslopt = sslopt + self.get_mask_key = get_mask_key + # Buffers over the packets from the layer beneath until desired amount + # bytes of bytes are received. + self._recv_buffer = [] + # These buffer over the build-up of a single frame. + self._frame_header = None + self._frame_length = None + self._frame_mask = None + self._cont_data = None + + def fileno(self): + return self.sock.fileno() + + def set_mask_key(self, func): + """ + set function to create musk key. You can custumize mask key generator. + Mainly, this is for testing purpose. + + func: callable object. the fuct must 1 argument as integer. + The argument means length of mask key. + This func must be return string(byte array), + which length is argument specified. + """ + self.get_mask_key = func + + def gettimeout(self): + """ + Get the websocket timeout(second). + """ + return self.sock.gettimeout() + + def settimeout(self, timeout): + """ + Set the timeout to the websocket. + + timeout: timeout time(second). + """ + self.sock.settimeout(timeout) + + timeout = property(gettimeout, settimeout) + + def connect(self, url, **options): + """ + Connect to url. url is websocket url scheme. ie. ws://host:port/resource + You can customize using 'options'. + If you set "header" dict object, you can set your own custom header. + + >>> ws = WebSocket() + >>> ws.connect("ws://echo.websocket.org/", + ... header={"User-Agent: MyProgram", + ... "x-custom: header"}) + + timeout: socket timeout time. This value is integer. + if you set None for this value, + it means "use default_timeout value" + + options: current support option is only "header". + if you set header as dict value, + the custom HTTP headers are added. + + """ + hostname, port, resource, is_secure = _parse_url(url) + # TODO: we need to support proxy + self.sock.connect((hostname, port)) + if is_secure: + if HAVE_SSL: + if self.sslopt is None: + sslopt = {} + else: + sslopt = self.sslopt + self.sock = ssl.wrap_socket(self.sock, **sslopt) + else: + raise WebSocketException("SSL not available.") + + self._handshake(hostname, port, resource, **options) + + def _handshake(self, host, port, resource, **options): + sock = self.sock + headers = [] + headers.append("GET %s HTTP/1.1" % resource) + headers.append("Upgrade: websocket") + headers.append("Connection: Upgrade") + if port == 80: + hostport = host + else: + hostport = "%s:%d" % (host, port) + headers.append("Host: %s" % hostport) + + if "origin" in options: + headers.append("Origin: %s" % options["origin"]) + else: + headers.append("Origin: http://%s" % hostport) + + key = _create_sec_websocket_key() + headers.append("Sec-WebSocket-Key: %s" % key) + headers.append("Sec-WebSocket-Version: %s" % VERSION) + if "header" in options: + headers.extend(options["header"]) + + headers.append("") + headers.append("") + + header_str = "\r\n".join(headers) + self._send(header_str) + if traceEnabled: + logger.debug("--- request header ---") + logger.debug(header_str) + logger.debug("-----------------------") + + status, resp_headers = self._read_headers() + if status != 101: + self.close() + raise WebSocketException("Handshake Status %d" % status) + + success = self._validate_header(resp_headers, key) + if not success: + self.close() + raise WebSocketException("Invalid WebSocket Header") + + self.connected = True + + def _validate_header(self, headers, key): + for k, v in _HEADERS_TO_CHECK.iteritems(): + r = headers.get(k, None) + if not r: + return False + r = r.lower() + if v != r: + return False + + result = headers.get("sec-websocket-accept", None) + if not result: + return False + result = result.lower() + + value = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" + hashed = base64.encodestring(hashlib.sha1(value).digest()).strip().lower() + return hashed == result + + def _read_headers(self): + status = None + headers = {} + if traceEnabled: + logger.debug("--- response header ---") + + while True: + line = self._recv_line() + if line == "\r\n": + break + line = line.strip() + if traceEnabled: + logger.debug(line) + if not status: + status_info = line.split(" ", 2) + status = int(status_info[1]) + else: + kv = line.split(":", 1) + if len(kv) == 2: + key, value = kv + headers[key.lower()] = value.strip().lower() + else: + raise WebSocketException("Invalid header") + + if traceEnabled: + logger.debug("-----------------------") + + return status, headers + + def send(self, payload, opcode=ABNF.OPCODE_TEXT): + """ + Send the data as string. + + payload: Payload must be utf-8 string or unicoce, + if the opcode is OPCODE_TEXT. + Otherwise, it must be string(byte array) + + opcode: operation code to send. Please see OPCODE_XXX. + """ + frame = ABNF.create_frame(payload, opcode) + if self.get_mask_key: + frame.get_mask_key = self.get_mask_key + data = frame.format() + length = len(data) + if traceEnabled: + logger.debug("send: " + repr(data)) + while data: + l = self._send(data) + data = data[l:] + return length + + def send_binary(self, payload): + return self.send(payload, ABNF.OPCODE_BINARY) + + def ping(self, payload=""): + """ + send ping data. + + payload: data payload to send server. + """ + self.send(payload, ABNF.OPCODE_PING) + + def pong(self, payload): + """ + send pong data. + + payload: data payload to send server. + """ + self.send(payload, ABNF.OPCODE_PONG) + + def recv(self): + """ + Receive string data(byte array) from the server. + + return value: string(byte array) value. + """ + opcode, data = self.recv_data() + return data + + def recv_data(self): + """ + Recieve data with operation code. + + return value: tuple of operation code and string(byte array) value. + """ + while True: + frame = self.recv_frame() + if not frame: + # handle error: + # 'NoneType' object has no attribute 'opcode' + raise WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): + if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data: + raise WebSocketException("Illegal frame") + if self._cont_data: + self._cont_data[1] += frame.data + else: + self._cont_data = [frame.opcode, frame.data] + + if frame.fin: + data = self._cont_data + self._cont_data = None + return data + elif frame.opcode == ABNF.OPCODE_CLOSE: + self.send_close() + return (frame.opcode, None) + elif frame.opcode == ABNF.OPCODE_PING: + self.pong(frame.data) + + def recv_frame(self): + """ + recieve data as frame from server. + + return value: ABNF frame object. + """ + # Header + if self._frame_header is None: + self._frame_header = self._recv_strict(2) + b1 = ord(self._frame_header[0]) + fin = b1 >> 7 & 1 + rsv1 = b1 >> 6 & 1 + rsv2 = b1 >> 5 & 1 + rsv3 = b1 >> 4 & 1 + opcode = b1 & 0xf + b2 = ord(self._frame_header[1]) + has_mask = b2 >> 7 & 1 + # Frame length + if self._frame_length is None: + length_bits = b2 & 0x7f + if length_bits == 0x7e: + length_data = self._recv_strict(2) + self._frame_length = struct.unpack("!H", length_data)[0] + elif length_bits == 0x7f: + length_data = self._recv_strict(8) + self._frame_length = struct.unpack("!Q", length_data)[0] + else: + self._frame_length = length_bits + # Mask + if self._frame_mask is None: + self._frame_mask = self._recv_strict(4) if has_mask else "" + # Payload + payload = self._recv_strict(self._frame_length) + if has_mask: + payload = ABNF.mask(self._frame_mask, payload) + # Reset for next frame + self._frame_header = None + self._frame_length = None + self._frame_mask = None + return ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload) + + + def send_close(self, status=STATUS_NORMAL, reason=""): + """ + send close data to the server. + + status: status code to send. see STATUS_XXX. + + reason: the reason to close. This must be string. + """ + if status < 0 or status >= ABNF.LENGTH_16: + raise ValueError("code is invalid range") + self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) + + def close(self, status=STATUS_NORMAL, reason=""): + """ + Close Websocket object + + status: status code to send. see STATUS_XXX. + + reason: the reason to close. This must be string. + """ + + try: + self.sock.shutdown(socket.SHUT_RDWR) + except: + pass + + ''' + if self.connected: + if status < 0 or status >= ABNF.LENGTH_16: + raise ValueError("code is invalid range") + + try: + self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) + timeout = self.sock.gettimeout() + self.sock.settimeout(3) + try: + frame = self.recv_frame() + if logger.isEnabledFor(logging.ERROR): + recv_status = struct.unpack("!H", frame.data)[0] + if recv_status != STATUS_NORMAL: + logger.error("close status: " + repr(recv_status)) + except: + pass + self.sock.settimeout(timeout) + self.sock.shutdown(socket.SHUT_RDWR) + except: + pass + ''' + self._closeInternal() + + def _closeInternal(self): + self.connected = False + self.sock.close() + + def _send(self, data): + try: + return self.sock.send(data) + except socket.timeout as e: + raise WebSocketTimeoutException(e.args[0]) + except Exception as e: + if "timed out" in e.args[0]: + raise WebSocketTimeoutException(e.args[0]) + else: + raise e + + def _recv(self, bufsize): + try: + bytes = self.sock.recv(bufsize) + except socket.timeout as e: + raise WebSocketTimeoutException(e.args[0]) + except SSLError as e: + if e.args[0] == "The read operation timed out": + raise WebSocketTimeoutException(e.args[0]) + else: + raise + if not bytes: + raise WebSocketConnectionClosedException() + return bytes + + + def _recv_strict(self, bufsize): + shortage = bufsize - sum(len(x) for x in self._recv_buffer) + while shortage > 0: + bytes = self._recv(shortage) + self._recv_buffer.append(bytes) + shortage -= len(bytes) + unified = "".join(self._recv_buffer) + if shortage == 0: + self._recv_buffer = [] + return unified + else: + self._recv_buffer = [unified[bufsize:]] + return unified[:bufsize] + + + def _recv_line(self): + line = [] + while True: + c = self._recv(1) + line.append(c) + if c == "\n": + break + return "".join(line) + + +class WebSocketApp(object): + """ + Higher level of APIs are provided. + The interface is like JavaScript WebSocket object. + """ + def __init__(self, url, header=[], + on_open=None, on_message=None, on_error=None, + on_close=None, keep_running=True, get_mask_key=None): + """ + url: websocket url. + header: custom header for websocket handshake. + on_open: callable object which is called at opening websocket. + this function has one argument. The arugment is this class object. + on_message: callbale object which is called when recieved data. + on_message has 2 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is utf-8 string which we get from the server. + on_error: callable object which is called when we get error. + on_error has 2 arguments. + The 1st arugment is this class object. + The passing 2nd arugment is exception object. + on_close: callable object which is called when closed the connection. + this function has one argument. The arugment is this class object. + keep_running: a boolean flag indicating whether the app's main loop should + keep running, defaults to True + get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's + docstring for more information + """ + self.url = url + self.header = header + self.on_open = on_open + self.on_message = on_message + self.on_error = on_error + self.on_close = on_close + self.keep_running = keep_running + self.get_mask_key = get_mask_key + self.sock = None + + def send(self, data, opcode=ABNF.OPCODE_TEXT): + """ + send message. + data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. + opcode: operation code of data. default is OPCODE_TEXT. + """ + if self.sock.send(data, opcode) == 0: + raise WebSocketConnectionClosedException() + + def close(self): + """ + close websocket connection. + """ + self.keep_running = False + if(self.sock != None): + self.sock.close() + + def _send_ping(self, interval): + while True: + for i in range(interval): + time.sleep(1) + if not self.keep_running: + return + self.sock.ping() + + def run_forever(self, sockopt=None, sslopt=None, ping_interval=0): + """ + run event loop for WebSocket framework. + This loop is infinite loop and is alive during websocket is available. + sockopt: values for socket.setsockopt. + sockopt must be tuple and each element is argument of sock.setscokopt. + sslopt: ssl socket optional dict. + ping_interval: automatically send "ping" command every specified period(second) + if set to 0, not send automatically. + """ + if sockopt is None: + sockopt = [] + if sslopt is None: + sslopt = {} + if self.sock: + raise WebSocketException("socket is already opened") + thread = None + self.keep_running = True + + try: + self.sock = WebSocket(self.get_mask_key, sockopt=sockopt, sslopt=sslopt) + self.sock.settimeout(default_timeout) + self.sock.connect(self.url, header=self.header) + self._callback(self.on_open) + + if ping_interval: + thread = threading.Thread(target=self._send_ping, args=(ping_interval,)) + thread.setDaemon(True) + thread.start() + + while self.keep_running: + + try: + data = self.sock.recv() + + if data is None or self.keep_running == False: + break + self._callback(self.on_message, data) + + except Exception, e: + #print str(e.args[0]) + if "timed out" not in e.args[0]: + raise e + + except Exception, e: + self._callback(self.on_error, e) + finally: + if thread: + self.keep_running = False + self.sock.close() + self._callback(self.on_close) + self.sock = None + + def _callback(self, callback, *args): + if callback: + try: + callback(self, *args) + except Exception, e: + logger.error(e) + if True:#logger.isEnabledFor(logging.DEBUG): + _, _, tb = sys.exc_info() + traceback.print_tb(tb) + + +if __name__ == "__main__": + enableTrace(True) + ws = create_connection("ws://echo.websocket.org/") + print("Sending 'Hello, World'...") + ws.send("Hello, World") + print("Sent") + print("Receiving...") + result = ws.recv() + print("Received '%s'" % result) + ws.close() diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py new file mode 100644 index 00000000..acf0df36 --- /dev/null +++ b/resources/lib/websocket_client.py @@ -0,0 +1,327 @@ +# -*- coding: utf-8 -*- + +################################################################################################# + +import json +import threading +import websocket + +import xbmc +import xbmcgui + +import clientinfo +import downloadutils +import librarysync +import playlist +import userclient +import utils + +import logging +logging.basicConfig() + +################################################################################################# + + +class WebSocket_Client(threading.Thread): + + _shared_state = {} + + client = None + stopWebsocket = False + + + def __init__(self): + + self.__dict__ = self._shared_state + self.monitor = xbmc.Monitor() + + self.doUtils = downloadutils.DownloadUtils() + self.clientInfo = clientinfo.ClientInfo() + self.addonName = self.clientInfo.getAddonName() + self.deviceId = self.clientInfo.getDeviceId() + self.librarySync = librarysync.LibrarySync() + + threading.Thread.__init__(self) + + def logMsg(self, msg, lvl=1): + + self.className = self.__class__.__name__ + utils.logMsg("%s %s" % (self.addonName, self.className), msg, lvl) + + + def sendProgressUpdate(self, data): + + log = self.logMsg + + log("sendProgressUpdate", 2) + try: + messageData = { + + 'MessageType': "ReportPlaybackProgress", + 'Data': data + } + messageString = json.dumps(messageData) + self.client.send(messageString) + log("Message data: %s" % messageString, 2) + + except Exception as e: + log("Exception: %s" % e, 1) + + def on_message(self, ws, message): + + log = self.logMsg + window = utils.window + lang = utils.language + + result = json.loads(message) + messageType = result['MessageType'] + data = result['Data'] + + if messageType not in ('SessionEnded'): + # Mute certain events + log("Message: %s" % message, 1) + + if messageType == "Play": + # A remote control play command has been sent from the server. + itemIds = data['ItemIds'] + command = data['PlayCommand'] + + pl = playlist.Playlist() + dialog = xbmcgui.Dialog() + + if command == "PlayNow": + dialog.notification( + heading="Emby for Kodi", + message="%s %s" % (len(itemIds), lang(33004)), + icon="special://home/addons/plugin.video.emby/icon.png", + sound=False) + startat = data.get('StartPositionTicks', 0) + pl.playAll(itemIds, startat) + + elif command == "PlayNext": + dialog.notification( + heading="Emby for Kodi", + message="%s %s" % (len(itemIds), lang(33005)), + icon="special://home/addons/plugin.video.emby/icon.png", + sound=False) + newplaylist = pl.modifyPlaylist(itemIds) + player = xbmc.Player() + if not player.isPlaying(): + # Only start the playlist if nothing is playing + player.play(newplaylist) + + elif messageType == "Playstate": + # A remote control update playstate command has been sent from the server. + command = data['Command'] + player = xbmc.Player() + + actions = { + + 'Stop': player.stop, + 'Unpause': player.pause, + 'Pause': player.pause, + 'NextTrack': player.playnext, + 'PreviousTrack': player.playprevious, + 'Seek': player.seekTime + } + action = actions[command] + if command == "Seek": + seekto = data['SeekPositionTicks'] + seektime = seekto / 10000000.0 + action(seektime) + log("Seek to %s." % seektime, 1) + else: + action() + log("Command: %s completed." % command, 1) + + window('emby_command', value="true") + + elif messageType == "UserDataChanged": + # A user changed their personal rating for an item, or their playstate was updated + userdata_list = data['UserDataList'] + self.librarySync.triage_items("userdata", userdata_list) + + elif messageType == "LibraryChanged": + + librarySync = self.librarySync + processlist = { + + 'added': data['ItemsAdded'], + 'update': data['ItemsUpdated'], + 'remove': data['ItemsRemoved'] + } + for action in processlist: + librarySync.triage_items(action, processlist[action]) + + elif messageType == "GeneralCommand": + + command = data['Name'] + arguments = data['Arguments'] + + if command in ('Mute', 'Unmute', 'SetVolume', + 'SetSubtitleStreamIndex', 'SetAudioStreamIndex'): + + player = xbmc.Player() + # These commands need to be reported back + if command == "Mute": + xbmc.executebuiltin('Mute') + elif command == "Unmute": + xbmc.executebuiltin('Mute') + elif command == "SetVolume": + volume = arguments['Volume'] + xbmc.executebuiltin('SetVolume(%s[,showvolumebar])' % volume) + elif command == "SetAudioStreamIndex": + index = int(arguments['Index']) + player.setAudioStream(index - 1) + elif command == "SetSubtitleStreamIndex": + embyindex = int(arguments['Index']) + currentFile = player.getPlayingFile() + + mapping = window('emby_%s.indexMapping' % currentFile) + if mapping: + externalIndex = json.loads(mapping) + # If there's external subtitles added via playbackutils + for index in externalIndex: + if externalIndex[index] == embyindex: + player.setSubtitleStream(int(index)) + break + else: + # User selected internal subtitles + external = len(externalIndex) + audioTracks = len(player.getAvailableAudioStreams()) + player.setSubtitleStream(external + embyindex - audioTracks - 1) + else: + # Emby merges audio and subtitle index together + audioTracks = len(player.getAvailableAudioStreams()) + player.setSubtitleStream(index - audioTracks - 1) + + # Let service know + window('emby_command', value="true") + + elif command == "DisplayMessage": + + header = arguments['Header'] + text = arguments['Text'] + xbmcgui.Dialog().notification( + heading=header, + message=text, + icon="special://home/addons/plugin.video.emby/icon.png", + time=4000) + + elif command == "SendString": + + string = arguments['String'] + text = { + + 'jsonrpc': "2.0", + 'id': 0, + 'method': "Input.SendText", + 'params': { + + 'text': "%s" % string, + 'done': False + } + } + result = xbmc.executeJSONRPC(json.dumps(text)) + + else: + builtin = { + + 'ToggleFullscreen': 'Action(FullScreen)', + 'ToggleOsdMenu': 'Action(OSD)', + 'ToggleContextMenu': 'Action(ContextMenu)', + 'MoveUp': 'Action(Up)', + 'MoveDown': 'Action(Down)', + 'MoveLeft': 'Action(Left)', + 'MoveRight': 'Action(Right)', + 'Select': 'Action(Select)', + 'Back': 'Action(back)', + 'GoHome': 'ActivateWindow(Home)', + 'PageUp': 'Action(PageUp)', + 'NextLetter': 'Action(NextLetter)', + 'GoToSearch': 'VideoLibrary.Search', + 'GoToSettings': 'ActivateWindow(Settings)', + 'PageDown': 'Action(PageDown)', + 'PreviousLetter': 'Action(PrevLetter)', + 'TakeScreenshot': 'TakeScreenshot', + 'ToggleMute': 'Mute', + 'VolumeUp': 'Action(VolumeUp)', + 'VolumeDown': 'Action(VolumeDown)', + } + action = builtin.get(command) + if action: + xbmc.executebuiltin(action) + + elif messageType == "ServerRestarting": + if utils.settings('supressRestartMsg') == "true": + xbmcgui.Dialog().notification( + heading="Emby for Kodi", + message=lang(33006), + icon="special://home/addons/plugin.video.emby/icon.png") + + elif messageType == "UserConfigurationUpdated": + # Update user data set in userclient + userclient.UserClient().userSettings = data + self.librarySync.refresh_views = True + + def on_close(self, ws): + self.logMsg("Closed.", 2) + + def on_open(self, ws): + self.doUtils.postCapabilities(self.deviceId) + + def on_error(self, ws, error): + if "10061" in str(error): + # Server is offline + pass + else: + self.logMsg("Error: %s" % error, 2) + + def run(self): + + log = self.logMsg + window = utils.window + monitor = self.monitor + + loglevel = int(window('emby_logLevel')) + # websocket.enableTrace(True) + + userId = window('emby_currUser') + server = window('emby_server%s' % userId) + token = window('emby_accessToken%s' % userId) + deviceId = self.deviceId + + # Get the appropriate prefix for the websocket + if "https" in server: + server = server.replace('https', "wss") + else: + server = server.replace('http', "ws") + + websocket_url = "%s?api_key=%s&deviceId=%s" % (server, token, deviceId) + log("websocket url: %s" % websocket_url, 1) + + self.client = websocket.WebSocketApp(websocket_url, + on_message=self.on_message, + on_error=self.on_error, + on_close=self.on_close) + + self.client.on_open = self.on_open + log("----===## Starting WebSocketClient ##===----", 0) + + while not monitor.abortRequested(): + + self.client.run_forever(ping_interval=10) + if self.stopWebsocket: + break + + if monitor.waitForAbort(5): + # Abort was requested, exit + break + + log("##===---- WebSocketClient Stopped ----===##", 0) + + def stopClient(self): + + self.stopWebsocket = True + self.client.close() + self.logMsg("Stopping thread.", 1) \ No newline at end of file