From c4433644efcbda114949d63f9b5d4bda75345e5c Mon Sep 17 00:00:00 2001 From: croneter Date: Wed, 17 Mar 2021 21:13:11 +0100 Subject: [PATCH] Adapt websocket client logic --- .../resource.language.en_gb/strings.po | 15 +- resources/lib/app/libsync.py | 5 - resources/lib/service_entry.py | 11 +- resources/lib/sync.py | 2 +- resources/lib/websocket.py | 917 ------------------ resources/lib/websocket_client.py | 460 ++++----- 6 files changed, 252 insertions(+), 1158 deletions(-) delete mode 100644 resources/lib/websocket.py diff --git a/resources/language/resource.language.en_gb/strings.po b/resources/language/resource.language.en_gb/strings.po index c2cdd294..fcc80e22 100644 --- a/resources/language/resource.language.en_gb/strings.po +++ b/resources/language/resource.language.en_gb/strings.po @@ -1134,11 +1134,6 @@ msgctxt "#39089" msgid "Alexa connection status:" msgstr "" -# PKC Settings - Connection - Background sync connection status -msgctxt "#39090" -msgid "Suspended - not connected" -msgstr "" - # PKC Settings - Connection - Background sync connection status msgctxt "#39091" msgid "Timeout - not connected" @@ -1149,6 +1144,16 @@ msgctxt "#39092" msgid "IOError - not connected" msgstr "" +# PKC Settings - Connection - Background sync connection status +msgctxt "#39093" +msgid "Suspended - not connected" +msgstr "" + +# PKC Settings - Connection - Background sync connection status +msgctxt "#39094" +msgid "Managed Plex User - not connected" +msgstr "" + msgctxt "#39200" msgid "Log-out Plex Home User " msgstr "" diff --git a/resources/lib/app/libsync.py b/resources/lib/app/libsync.py index 84a9b783..397abe9c 100644 --- a/resources/lib/app/libsync.py +++ b/resources/lib/app/libsync.py @@ -55,8 +55,6 @@ class Sync(object): # How often shall we sync? self.full_sync_intervall = None - # Background Sync disabled? - self.background_sync_disabled = None # How long shall we wait with synching a new item to make sure Plex got all # metadata? self.backgroundsync_saftymargin = None @@ -79,7 +77,6 @@ class Sync(object): # List of section_ids we're synching to Kodi - will be automatically # re-built if sections are set a-new self.section_ids = set() - self.enable_alexa = None self.load() @@ -120,8 +117,6 @@ class Sync(object): Any settings unrelated to syncs to the Kodi database - can thus be safely reset without a Kodi reboot """ - self.background_sync_disabled = utils.settings('enableBackgroundSync') == 'false' - self.enable_alexa = utils.settings('enable_alexa') == 'true' self.sync_dialog = utils.settings('dbSyncIndicator') == 'true' self.full_sync_intervall = int(utils.settings('fullSyncInterval')) * 60 self.backgroundsync_saftymargin = int(utils.settings('backgroundsync_saftyMargin')) diff --git a/resources/lib/service_entry.py b/resources/lib/service_entry.py index 28c30612..ce3e9678 100644 --- a/resources/lib/service_entry.py +++ b/resources/lib/service_entry.py @@ -98,7 +98,8 @@ class Service(object): self.welcome_msg = True self.connection_check_counter = 0 self.setup = None - self.alexa = None + self.pms_ws = None + self.alexa_ws = None self.playqueue = None # Flags for other threads self.connection_check_running = False @@ -444,8 +445,8 @@ class Service(object): self.setup.setup() # Initialize important threads - self.ws = websocket_client.PMS_Websocket() - self.alexa = websocket_client.Alexa_Websocket() + self.pms_ws = websocket_client.get_pms_websocketapp() + self.alexa_ws = websocket_client.get_alexa_websocketapp() self.sync = sync.Sync() self.plexcompanion = plex_companion.PlexCompanion() self.playqueue = playqueue.PlayqueueMonitor() @@ -545,11 +546,11 @@ class Service(object): continue elif not self.startup_completed: self.startup_completed = True - self.ws.start() + self.pms_ws.start() self.sync.start() self.plexcompanion.start() self.playqueue.start() - self.alexa.start() + self.alexa_ws.start() elif app.APP.is_playing: skip_plex_intro.check() diff --git a/resources/lib/sync.py b/resources/lib/sync.py index 43eb0a74..9eb5c519 100644 --- a/resources/lib/sync.py +++ b/resources/lib/sync.py @@ -224,7 +224,7 @@ class Sync(backgroundthread.KillableThread): not app.APP.is_playing_video): LOG.info('Doing scheduled full library scan') self.start_library_sync() - elif not app.SYNC.background_sync_disabled: + else: # Check back whether we should process something Only do # this once a while (otherwise, potentially many screen # refreshes lead to flickering) diff --git a/resources/lib/websocket.py b/resources/lib/websocket.py deleted file mode 100644 index e730c1b2..00000000 --- a/resources/lib/websocket.py +++ /dev/null @@ -1,917 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -websocket - WebSocket client library for Python - -Copyright (C) 2010 Hiroki Ohtani(liris) - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, write to the Free Software - Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -""" -import socket - -try: - import ssl - from ssl import SSLError - HAVE_SSL = True -except ImportError: - class SSLError(Exception): - """ - Dummy class of SSLError for ssl none-support environment. - """ - pass - - HAVE_SSL = False - -from urllib.parse import urlparse -import os -import array -import struct -import uuid -import hashlib -import base64 -import threading -import logging -import traceback -import sys - -from . import app - -############################################################################### - -LOG = logging.getLogger('PLEX.websocket') - -############################################################################### - -""" -websocket python client. -========================= - -This version support only hybi-13. -Please see http://tools.ietf.org/html/rfc6455 for protocol. -""" - - -# websocket supported version. -VERSION = 13 - -# closing frame status codes. -STATUS_NORMAL = 1000 -STATUS_GOING_AWAY = 1001 -STATUS_PROTOCOL_ERROR = 1002 -STATUS_UNSUPPORTED_DATA_TYPE = 1003 -STATUS_STATUS_NOT_AVAILABLE = 1005 -STATUS_ABNORMAL_CLOSED = 1006 -STATUS_INVALID_PAYLOAD = 1007 -STATUS_POLICY_VIOLATION = 1008 -STATUS_MESSAGE_TOO_BIG = 1009 -STATUS_INVALID_EXTENSION = 1010 -STATUS_UNEXPECTED_CONDITION = 1011 -STATUS_TLS_HANDSHAKE_ERROR = 1015 - - -class WebSocketException(Exception): - """ - websocket exeception class. - """ - pass - - -class WebSocketConnectionClosedException(WebSocketException): - """ - If remote host closed the connection or some network error happened, - this exception will be raised. - """ - pass - - -class WebSocketTimeoutException(WebSocketException): - """ - WebSocketTimeoutException will be raised at socket timeout during read and - write data. - """ - pass - - -class WebsocketRedirect(WebSocketException): - """ - WebsocketRedirect will be raised if a status code 301 is returned - The Exception will be instantiated with a dict containing all response - headers; which should contain the redirect address under the key 'location' - - Access the headers via the attribute headers - """ - def __init__(self, headers): - self.headers = headers - super(WebsocketRedirect, self).__init__() - - -DEFAULT_TIMEOUT = None -TRACE_ENABLED = False - - -def enable_trace(tracable): - """ - turn on/off the tracability. - - tracable: boolean value. if set True, tracability is enabled. - """ - global TRACE_ENABLED - TRACE_ENABLED = tracable - if tracable: - if not LOG.handlers: - LOG.addHandler(logging.StreamHandler()) - LOG.setLevel(logging.DEBUG) - - -def setdefaulttimeout(timeout): - """ - Set the global timeout setting to connect. - - timeout: default socket timeout time. This value is second. - """ - global DEFAULT_TIMEOUT - DEFAULT_TIMEOUT = timeout - - -def getdefaulttimeout(): - """ - Return the global timeout setting(second) to connect. - """ - return DEFAULT_TIMEOUT - - -def _parse_url(url): - """ - parse url and the result is tuple of - (hostname, port, resource path and the flag of secure mode) - - url: url string. - """ - if ":" not in url: - raise ValueError("url is invalid") - - scheme, url = url.split(":", 1) - - parsed = urlparse(url, scheme="http") - if parsed.hostname: - hostname = parsed.hostname - else: - raise ValueError("hostname is invalid") - port = 0 - if parsed.port: - port = parsed.port - - is_secure = False - if scheme == "ws" or scheme == 'http': - if not port: - port = 80 - elif scheme == "wss" or scheme == 'https': - is_secure = True - if not port: - port = 443 - else: - raise ValueError("scheme %s is invalid" % scheme) - - if parsed.path: - resource = parsed.path - else: - resource = "/" - - if parsed.query: - resource += "?" + parsed.query - - return (hostname, port, resource, is_secure) - - -def create_connection(url, timeout=None, **options): - """ - connect to url and return websocket object. - - Connect to url and return the WebSocket object. - Passing optional timeout parameter will set the timeout on the socket. - If no timeout is supplied, the global default timeout setting returned by - getdefauttimeout() is used. - You can customize using 'options'. - If you set "header" list object, you can set your own custom header. - - >>> conn = create_connection("ws://echo.websocket.org/", - ... header=["User-Agent: MyProgram", - ... "x-custom: header"]) - - - timeout: socket timeout time. This value is integer. - if you set None for this value, it means "use DEFAULT_TIMEOUT - value" - - options: current support option is only "header". - if you set header as dict value, the custom HTTP headers are added - """ - sockopt = options.get("sockopt", []) - sslopt = options.get("sslopt", {}) - websock = WebSocket(sockopt=sockopt, sslopt=sslopt) - websock.settimeout(timeout if timeout is not None else DEFAULT_TIMEOUT) - websock.connect(url, **options) - return websock - - -_MAX_INTEGER = (1 << 32) - 1 -_AVAILABLE_KEY_CHARS = list(range(0x21, 0x2f + 1)) + list(range(0x3a, 0x7e + 1)) -_MAX_CHAR_BYTE = (1 << 8) - 1 - -# ref. Websocket gets an update, and it breaks stuff. -# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html - - -def _create_sec_websocket_key(): - uid = uuid.uuid4() - return base64.encodestring(uid.bytes).strip() - - -_HEADERS_TO_CHECK = {"upgrade": "websocket", "connection": "upgrade"} - - -class ABNF(object): - """ - ABNF frame class. - see http://tools.ietf.org/html/rfc5234 - and http://tools.ietf.org/html/rfc6455#section-5.2 - """ - - # operation code values. - OPCODE_CONT = 0x0 - OPCODE_TEXT = 0x1 - OPCODE_BINARY = 0x2 - OPCODE_CLOSE = 0x8 - OPCODE_PING = 0x9 - OPCODE_PONG = 0xa - - # available operation code value tuple - OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE, - OPCODE_PING, OPCODE_PONG) - - # opcode human readable string - OPCODE_MAP = { - OPCODE_CONT: "cont", - OPCODE_TEXT: "text", - OPCODE_BINARY: "binary", - OPCODE_CLOSE: "close", - OPCODE_PING: "ping", - OPCODE_PONG: "pong" - } - - # data length threashold. - LENGTH_7 = 0x7d - LENGTH_16 = 1 << 16 - LENGTH_63 = 1 << 63 - - def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0, - opcode=OPCODE_TEXT, mask=1, data=""): - """ - Constructor for ABNF. - please check RFC for arguments. - """ - self.fin = fin - self.rsv1 = rsv1 - self.rsv2 = rsv2 - self.rsv3 = rsv3 - self.opcode = opcode - self.mask = mask - self.data = data - self.get_mask_key = os.urandom - - def __str__(self): - return "fin=" + str(self.fin) \ - + " opcode=" + str(self.opcode) \ - + " data=" + str(self.data) - - @staticmethod - def create_frame(data, opcode): - """ - create frame to send text, binary and other data. - - data: data to send. This is string value(byte array). - if opcode is OPCODE_TEXT and this value is uniocde, - data value is conveted into unicode string, automatically. - - opcode: operation code. please see OPCODE_XXX. - """ - if opcode == ABNF.OPCODE_TEXT and isinstance(data, str): - data = data.encode() - # mask must be set if send data from client - return ABNF(1, 0, 0, 0, opcode, 1, data) - - def format(self): - """ - format this object to string(byte array) to send data to server. - """ - if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]): - raise ValueError("not 0 or 1") - if self.opcode not in ABNF.OPCODES: - raise ValueError("Invalid OPCODE") - length = len(self.data) - if length >= ABNF.LENGTH_63: - raise ValueError("data is too long") - - frame_header = chr(self.fin << 7 | - self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 | - self.opcode) - if length < ABNF.LENGTH_7: - frame_header += chr(self.mask << 7 | length) - elif length < ABNF.LENGTH_16: - frame_header += chr(self.mask << 7 | 0x7e) - frame_header += struct.pack("!H", length) - else: - frame_header += chr(self.mask << 7 | 0x7f) - frame_header += struct.pack("!Q", length) - - if not self.mask: - return frame_header + self.data - else: - mask_key = self.get_mask_key(4) - return frame_header + self._get_masked(mask_key) - - def _get_masked(self, mask_key): - s = ABNF.mask(mask_key, self.data) - return mask_key + "".join(s) - - @staticmethod - def mask(mask_key, data): - """ - mask or unmask data. Just do xor for each byte - - mask_key: 4 byte string(byte). - - data: data to mask/unmask. - """ - _m = array.array("B", mask_key) - _d = array.array("B", data) - for i, _ in enumerate(_d): - _d[i] ^= _m[i % 4] - return _d.tostring() - - -class WebSocket(object): - """ - Low level WebSocket interface. - This class is based on - The WebSocket protocol draft-hixie-thewebsocketprotocol-76 - http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76 - - We can connect to the websocket server and send/recieve data. - The following example is a echo client. - - >>> import websocket - >>> ws = websocket.WebSocket() - >>> ws.connect("ws://echo.websocket.org") - >>> ws.send("Hello, Server") - >>> ws.recv() - 'Hello, Server' - >>> ws.close() - - get_mask_key: a callable to produce new mask keys, see the set_mask_key - function's docstring for more details - sockopt: values for socket.setsockopt. - sockopt must be tuple and each element is argument of sock.setscokopt. - sslopt: dict object for ssl socket option. - """ - - def __init__(self, get_mask_key=None, sockopt=None, sslopt=None): - """ - Initalize WebSocket object. - """ - if sockopt is None: - sockopt = [] - if sslopt is None: - sslopt = {} - self.connected = False - self.sock = socket.socket() - for opts in sockopt: - self.sock.setsockopt(*opts) - self.sslopt = sslopt - self.get_mask_key = get_mask_key - # Buffers over the packets from the layer beneath until desired amount - # bytes of bytes are received. - self._recv_buffer = [] - # These buffer over the build-up of a single frame. - self._frame_header = None - self._frame_length = None - self._frame_mask = None - self._cont_data = None - - def fileno(self): - """ - Returns sock.fileno() - """ - return self.sock.fileno() - - def set_mask_key(self, func): - """ - set function to create musk key. You can custumize mask key generator. - Mainly, this is for testing purpose. - - func: callable object. the fuct must 1 argument as integer. - The argument means length of mask key. - This func must be return string(byte array), - which length is argument specified. - """ - self.get_mask_key = func - - def gettimeout(self): - """ - Get the websocket timeout(second). - """ - return self.sock.gettimeout() - - def settimeout(self, timeout): - """ - Set the timeout to the websocket. - - timeout: timeout time(second). - """ - self.sock.settimeout(timeout) - - timeout = property(gettimeout, settimeout) - - def connect(self, url, **options): - """ - Connect to url. url is websocket url scheme. ie. ws://host:port/resource - You can customize using 'options'. - If you set "header" dict object, you can set your own custom header. - - >>> ws = WebSocket() - >>> ws.connect("ws://echo.websocket.org/", - ... header={"User-Agent: MyProgram", - ... "x-custom: header"}) - - timeout: socket timeout time. This value is integer. - if you set None for this value, - it means "use DEFAULT_TIMEOUT value" - - options: current support option is only "header". - if you set header as dict value, - the custom HTTP headers are added. - - """ - hostname, port, resource, is_secure = _parse_url(url) - # TODO: we need to support proxy - self.sock.connect((hostname, port)) - if is_secure: - if HAVE_SSL: - if self.sslopt is None: - sslopt = {} - else: - sslopt = self.sslopt - self.sock = ssl.wrap_socket(self.sock, **sslopt) - else: - raise WebSocketException("SSL not available.") - - self._handshake(hostname, port, resource, **options) - - def _handshake(self, host, port, resource, **options): - headers = [] - headers.append("GET %s HTTP/1.1" % resource) - headers.append("Upgrade: websocket") - headers.append("Connection: Upgrade") - if port == 80: - hostport = host - else: - hostport = "%s:%d" % (host, port) - headers.append("Host: %s" % hostport) - - if "origin" in options: - headers.append("Origin: %s" % options["origin"]) - else: - headers.append("Origin: http://%s" % hostport) - - key = _create_sec_websocket_key() - headers.append("Sec-WebSocket-Key: %s" % key) - headers.append("Sec-WebSocket-Version: %s" % VERSION) - if "header" in options: - headers.extend(options["header"]) - - headers.append("") - headers.append("") - - header_str = "\r\n".join(headers) - self._send(header_str) - if TRACE_ENABLED: - LOG.debug("--- request header ---") - LOG.debug(header_str) - LOG.debug("-----------------------") - - status, resp_headers = self._read_headers() - if status == 301: - # Redirect - raise WebsocketRedirect(resp_headers) - if status != 101: - self.close() - raise WebSocketException("Handshake Status %d" % status) - - success = self._validate_header(resp_headers, key) - if not success: - self.close() - raise WebSocketException("Invalid WebSocket Header") - - self.connected = True - - @staticmethod - def _validate_header(headers, key): - for k, v in _HEADERS_TO_CHECK.items(): - r = headers.get(k, None) - if not r: - return False - r = r.lower() - if v != r: - return False - - result = headers.get("sec-websocket-accept", None) - if not result: - return False - result = result.lower() - - value = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" - hashed = base64.encodestring(hashlib.sha1(value).digest()).strip().lower() - return hashed == result - - def _read_headers(self): - status = None - headers = {} - if TRACE_ENABLED: - LOG.debug("--- response header ---") - - while True: - line = self._recv_line() - if line == "\r\n": - break - line = line.strip() - if TRACE_ENABLED: - LOG.debug(line) - if not status: - status_info = line.split(" ", 2) - status = int(status_info[1]) - else: - kv = line.split(":", 1) - if len(kv) == 2: - key, value = kv - headers[key.lower()] = value.strip().lower() - else: - raise WebSocketException("Invalid header") - - if TRACE_ENABLED: - LOG.debug("-----------------------") - - return status, headers - - def send(self, payload, opcode=ABNF.OPCODE_TEXT): - """ - Send the data as string. - - payload: Payload must be utf-8 string or unicoce, - if the opcode is OPCODE_TEXT. - Otherwise, it must be string(byte array) - - opcode: operation code to send. Please see OPCODE_XXX. - """ - frame = ABNF.create_frame(payload, opcode) - if self.get_mask_key: - frame.get_mask_key = self.get_mask_key - data = frame.format() - length = len(data) - if TRACE_ENABLED: - LOG.debug("send: %s", repr(data)) - while data: - l = self._send(data) - data = data[l:] - return length - - def send_binary(self, payload): - """ - send the payload - """ - return self.send(payload, ABNF.OPCODE_BINARY) - - def ping(self, payload=""): - """ - send ping data. - - payload: data payload to send server. - """ - self.send(payload, ABNF.OPCODE_PING) - - def pong(self, payload): - """ - send pong data. - - payload: data payload to send server. - """ - self.send(payload, ABNF.OPCODE_PONG) - - def recv(self): - """ - Receive string data(byte array) from the server. - - return value: string(byte array) value. - """ - _, data = self.recv_data() - return data - - def recv_data(self): - """ - Recieve data with operation code. - - return value: tuple of operation code and string(byte array) value. - """ - while True: - frame = self.recv_frame() - if not frame: - # handle error: - # 'NoneType' object has no attribute 'opcode' - raise WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT): - if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data: - raise WebSocketException("Illegal frame") - if self._cont_data: - self._cont_data[1] += frame.data - else: - self._cont_data = [frame.opcode, frame.data] - if frame.fin: - data = self._cont_data - self._cont_data = None - return data - elif frame.opcode == ABNF.OPCODE_CLOSE: - self.send_close() - return (frame.opcode, None) - elif frame.opcode == ABNF.OPCODE_PING: - self.pong(frame.data) - - def recv_frame(self): - """ - recieve data as frame from server. - - return value: ABNF frame object. - """ - # Header - if self._frame_header is None: - self._frame_header = self._recv_strict(2) - b1 = ord(self._frame_header[0]) - fin = b1 >> 7 & 1 - rsv1 = b1 >> 6 & 1 - rsv2 = b1 >> 5 & 1 - rsv3 = b1 >> 4 & 1 - opcode = b1 & 0xf - b2 = ord(self._frame_header[1]) - has_mask = b2 >> 7 & 1 - # Frame length - if self._frame_length is None: - length_bits = b2 & 0x7f - if length_bits == 0x7e: - length_data = self._recv_strict(2) - self._frame_length = struct.unpack("!H", length_data)[0] - elif length_bits == 0x7f: - length_data = self._recv_strict(8) - self._frame_length = struct.unpack("!Q", length_data)[0] - else: - self._frame_length = length_bits - # Mask - if self._frame_mask is None: - self._frame_mask = self._recv_strict(4) if has_mask else "" - # Payload - payload = self._recv_strict(self._frame_length) - if has_mask: - payload = ABNF.mask(self._frame_mask, payload) - # Reset for next frame - self._frame_header = None - self._frame_length = None - self._frame_mask = None - return ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload) - - - def send_close(self, status=STATUS_NORMAL, reason=""): - """ - send close data to the server. - - status: status code to send. see STATUS_XXX. - - reason: the reason to close. This must be string. - """ - if status < 0 or status >= ABNF.LENGTH_16: - raise ValueError("code is invalid range") - self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE) - - def close(self, status=STATUS_NORMAL, reason=""): - """ - Close Websocket object - - status: status code to send. see STATUS_XXX. - - reason: the reason to close. This must be string. - """ - try: - self.sock.shutdown(socket.SHUT_RDWR) - except Exception: - pass - self._closeInternal() - - def _closeInternal(self): - self.connected = False - self.sock.close() - - def _send(self, data): - try: - return self.sock.send(data.encode('utf-8')) - except socket.timeout as e: - raise WebSocketTimeoutException(e.args[0]) - except Exception as e: - if "timed out" in e.args[0]: - raise WebSocketTimeoutException(e.args[0]) - else: - raise e - - def _recv(self, bufsize): - try: - bytes_ = self.sock.recv(bufsize) - except socket.timeout as e: - raise WebSocketTimeoutException(e.args[0]) - except SSLError as e: - if e.args[0] == "The read operation timed out": - raise WebSocketTimeoutException(e.args[0]) - else: - raise - if not bytes_: - raise WebSocketConnectionClosedException() - return bytes_ - - def _recv_strict(self, bufsize): - shortage = bufsize - sum(len(x) for x in self._recv_buffer) - while shortage > 0: - bytes_ = self._recv(shortage) - self._recv_buffer.append(bytes_) - shortage -= len(bytes_) - unified = "".join(self._recv_buffer) - if shortage == 0: - self._recv_buffer = [] - return unified - else: - self._recv_buffer = [unified[bufsize:]] - return unified[:bufsize] - - def _recv_line(self): - line = [] - while True: - c = self._recv(1) - line.append(c) - if c == "\n": - break - return "".join(line) - - -class WebSocketApp(object): - """ - Higher level of APIs are provided. - The interface is like JavaScript WebSocket object. - """ - def __init__(self, url, header=None, - on_open=None, on_message=None, on_error=None, - on_close=None, keep_running=True, get_mask_key=None): - """ - url: websocket url. - header: custom header for websocket handshake. - on_open: callable object which is called at opening websocket. - this function has one argument. The arugment is this class object. - on_message: callbale object which is called when recieved data. - on_message has 2 arguments. - The 1st arugment is this class object. - The passing 2nd arugment is utf-8 string which we get from the server. - on_error: callable object which is called when we get error. - on_error has 2 arguments. - The 1st arugment is this class object. - The passing 2nd arugment is exception object. - on_close: callable object which is called when closed the connection. - this function has one argument. The arugment is this class object. - keep_running: a boolean flag indicating whether the app's main loop should - keep running, defaults to True - get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's - docstring for more information - """ - self.url = url - self.header = [] if header is None else header - self.on_open = on_open - self.on_message = on_message - self.on_error = on_error - self.on_close = on_close - self.keep_running = keep_running - self.get_mask_key = get_mask_key - self.sock = None - - def send(self, data, opcode=ABNF.OPCODE_TEXT): - """ - send message. - data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode. - opcode: operation code of data. default is OPCODE_TEXT. - """ - if self.sock.send(data, opcode) == 0: - raise WebSocketConnectionClosedException() - - def close(self): - """ - close websocket connection. - """ - self.keep_running = False - if self.sock != None: - self.sock.close() - - def _send_ping(self, interval): - while True: - for _ in range(interval): - app.APP.monitor.waitForAbort(1) - if not self.keep_running: - return - self.sock.ping() - - def run_forever(self, sockopt=None, sslopt=None, ping_interval=0): - """ - run event loop for WebSocket framework. - This loop is infinite loop and is alive during websocket is available. - sockopt: values for socket.setsockopt. - sockopt must be tuple and each element is argument of - sock.setscokopt. - sslopt: ssl socket optional dict. - ping_interval: automatically send "ping" command every specified - period(second) - if set to 0, not send automatically. - """ - if sockopt is None: - sockopt = [] - if sslopt is None: - sslopt = {} - if self.sock: - raise WebSocketException("socket is already opened") - thread = None - self.keep_running = True - - try: - self.sock = WebSocket(self.get_mask_key, - sockopt=sockopt, - sslopt=sslopt) - self.sock.settimeout(DEFAULT_TIMEOUT) - self.sock.connect(self.url, header=self.header) - self._callback(self.on_open) - - if ping_interval: - thread = threading.Thread(target=self._send_ping, - args=(ping_interval,)) - thread.setDaemon(True) - thread.start() - - while self.keep_running: - try: - data = self.sock.recv() - if data is None or self.keep_running is False: - break - self._callback(self.on_message, data) - except Exception as e: - if "timed out" not in e.args[0]: - raise e - - except Exception as e: - self._callback(self.on_error, e) - finally: - if thread: - self.keep_running = False - self.sock.close() - self._callback(self.on_close) - self.sock = None - - def _callback(self, callback, *args): - if callback: - try: - callback(self, *args) - except Exception as e: - LOG.error(e) - _, _, tb = sys.exc_info() - traceback.print_tb(tb) - - -if __name__ == "__main__": - enable_trace(True) - WEBSOCKET = create_connection("ws://echo.websocket.org/") - LOG.info("Sending 'Hello, World'...") - WEBSOCKET.send("Hello, World") - LOG.info("Sent") - LOG.info("Receiving...") - RESULT = WEBSOCKET.recv() - LOG.info("Received '%s'", RESULT) - WEBSOCKET.close() diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index 17b2b513..816e8630 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -1,57 +1,158 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- from logging import getLogger -from json import loads +import json -from . import backgroundthread, websocket, utils, companion, app, variables as v +from . import websocket +from . import backgroundthread, app, variables as v, utils, companion -############################################################################### +log = getLogger('PLEX.websocket') -LOG = getLogger('PLEX.websocket_client') +PMS_PATH = '/:/websockets/notifications' -############################################################################### +PMS_INTERESTING_MESSAGE_TYPES = ('playing', 'timeline', 'activity') +SETTINGS_STRING = '_status' -class WebSocket(backgroundthread.KillableThread): - opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY) - status_setting = None +def get_pms_uri(): + uri = app.CONN.server + if not uri: + return + # Get the appropriate prefix for the websocket + if uri.startswith('https'): + uri = "wss%s" % uri[5:] + else: + uri = "ws%s" % uri[4:] + uri += PMS_PATH + log.debug('uri to connect pms websocket: %s', uri) + if app.ACCOUNT.pms_token: + uri += '?X-Plex-Token=' + app.ACCOUNT.pms_token + return uri - def __init__(self): - self.ws = None - self.redirect_uri = None - self.sleeptime = 0.0 - super(WebSocket, self).__init__() - def close_websocket(self): - if self.ws is not None: - self.ws.close() - self.ws = None +def get_alexa_uri(): + if not app.ACCOUNT.plex_token: + return + return (f'wss://pubsub.plex.tv/sub/websockets/{app.ACCOUNT.plex_user_id}/' + f'{v.PKC_MACHINE_IDENTIFIER}?' + f'X-Plex-Token={app.ACCOUNT.plex_token}') - def process(self, opcode, message): - raise NotImplementedError - def receive(self, ws): - # Not connected yet - if ws is None: - raise websocket.WebSocketConnectionClosedException +def pms_on_message(ws, message): + """ + Called when we receive a message from the PMS, e.g. when a new library + item has been added. + """ + try: + message = json.loads(message) + except ValueError as err: + log.error('Error decoding PMS websocket message: %s', err) + log.error('message: %s', message) + return + try: + message = message['NotificationContainer'] + typus = message['type'] + except KeyError: + log.error('Could not parse PMS message: %s', message) + return + # Triage + if typus not in PMS_INTERESTING_MESSAGE_TYPES: + # Drop everything we're not interested in + return + else: + # Put PMS message on queue and let libsync take care of it + app.APP.websocket_queue.put(message) - frame = ws.recv_frame() - if not frame: - raise websocket.WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in self.opcode_data: - return frame.opcode, frame.data - elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: - ws.send_close() - return frame.opcode, None - elif frame.opcode == websocket.ABNF.OPCODE_PING: - ws.pong("Hi!") - return None, None +def alexa_on_message(ws, message): + """ + Called when we receive a message from Alexa + """ + log.debug('alexa message received: %s', message) + try: + message = utils.etree.fromstring(message) + except Exception as err: + log.error('Error decoding message from Alexa: %s %s', type(err), err) + log.error('message from Alexa: %s', message) + return + try: + if message.attrib['command'] == 'processRemoteControlCommand': + message = message[0] + else: + log.error('Unknown Alexa message received: %s', message) + return + companion.process_command(message.attrib['path'][1:], message.attrib) + except Exception as err: + log.exception('Could not parse Alexa message, error: %s %s', + type(err), err) + log.error('message: %s', message) - def getUri(self): - raise NotImplementedError - def _sleep_cycle(self): +def on_error(ws, error): + status = ws.name + SETTINGS_STRING + if isinstance(error, IOError): + # We are probably offline + log.debug('%s: IOError connecting', ws.name) + # Status = IOError - not connected + utils.settings(status, value=utils.lang(39092)) + ws.sleep_cycle() + elif isinstance(error, websocket.WebSocketTimeoutException): + log.debug('%s: WebSocketTimeoutException', ws.name) + # Status = 'Timeout - not connected' + utils.settings(status, value=utils.lang(39091)) + ws.sleep_cycle() + elif isinstance(error, websocket.WebSocketConnectionClosedException): + log.debug('%s: WebSocketConnectionClosedException', ws.name) + # Status = Not connected + utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208)) + elif isinstance(error, websocket.WebSocketBadStatusException): + # Most likely Alexa not connecting, throwing a 403 + log.debug('%s: got a bad HTTP status: %s', ws.name, error) + # Status = + utils.settings(status, value=str(error)) + ws.sleep_cycle() + elif isinstance(error, websocket.WebSocketException): + log.error('%s: got another websocket exception %s: %s', + ws.name, type(error), error) + # Status = Error + utils.settings(status, value=utils.lang(257)) + ws.sleep_cycle() + elif isinstance(error, SystemExit): + log.debug('%s: SystemExit detected', ws.name) + # Status = Not connected + utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208)) + else: + log.exception('%s: got an unexpected exception of type %s: %s', + ws.name, type(error), error) + # Status = Error + utils.settings(status, value=utils.lang(257)) + raise RuntimeError + + +def on_close(ws): + """ + This does not seem to get called by our websocket client :-( + """ + log.debug('%s: connection closed', ws.name) + # Status = Not connected + utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208)) + + +def on_open(ws): + log.debug('%s: connected', ws.name) + # Status = Connected + utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(13296)) + ws.sleeptime = 0 + + +class PlexWebSocketApp(websocket.WebSocketApp, + backgroundthread.KillableThread): + def __init__(self, **kwargs): + self.sleeptime = 0 + backgroundthread.KillableThread.__init__(self) + websocket.WebSocketApp.__init__(self, self.get_uri(), **kwargs) + + def sleep_cycle(self): """ Sleeps for 2^self.sleeptime where sleeping period will be doubled with each unsuccessful connection attempt. @@ -59,224 +160,133 @@ class WebSocket(backgroundthread.KillableThread): """ self.sleep(2 ** self.sleeptime) if self.sleeptime < 6: - self.sleeptime += 1.0 + self.sleeptime += 1 + + def suspend(self, block=False, timeout=None): + """ + Call this method from another thread to suspend this websocket thread + """ + self.close() + backgroundthread.KillableThread.suspend(self, block, timeout) + + def cancel(self): + """ + Call this method from another thread to cancel this websocket thread + """ + self.close() + backgroundthread.KillableThread.cancel(self) def run(self): - LOG.info("----===## Starting %s ##===----", self.__class__.__name__) + """ + Ensure that sockets will be closed no matter what + """ + log.info("----===## Starting %s ##===----", self.name) app.APP.register_thread(self) try: self._run() + except RuntimeError: + pass + except Exception as err: + log.exception('Exception of type %s occured: %s', type(err), err) finally: - self.close_websocket() + self.close() + # Status = Not connected + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(15208)) app.APP.deregister_thread(self) - LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__) + log.info("----===## %s stopped ##===----", self.name) def _run(self): while not self.should_cancel(): # In the event the server goes offline - if self.should_suspend(): - # Set in service.py - self.close_websocket() - # Status = 'Suspended - not connected' - utils.settings(self.status_setting, value=utils.lang(39090)) + while self.should_suspend(): + # We will be caught in this loop if either another thread + # called the suspend() method, thus setting _suspended = True + # OR if there any other conditions to not open a websocket + # connection - see methods should_suspend() below + # Status = Suspended - not connected + self.set_suspension_settings_status() if self.wait_while_suspended(): # Abort was requested while waiting. We should exit return - try: - self.process(*self.receive(self.ws)) - except websocket.WebSocketTimeoutException: - # No worries if read timed out - pass - except websocket.WebSocketConnectionClosedException: - LOG.debug("%s: connection closed, (re)connecting", - self.__class__.__name__) - uri, sslopt = self.getUri() - try: - # Low timeout - let's us shut this thread down! - self.ws = websocket.create_connection( - uri, - timeout=1, - sslopt=sslopt, - enable_multithread=True) - except IOError: - # Server is probably offline - LOG.debug("%s: IOError connecting", self.__class__.__name__) - self.ws = None - # Status = IOError - not connected - utils.settings(self.status_setting, - value=utils.lang(39092)) - self._sleep_cycle() - except websocket.WebSocketTimeoutException: - LOG.debug("%s: WebSocketTimeoutException", self.__class__.__name__) - self.ws = None - # Status = 'Timeout - not connected' - utils.settings(self.status_setting, - value=utils.lang(39091)) - self._sleep_cycle() - except websocket.WebsocketRedirect as e: - LOG.debug('301 redirect detected: %s', e) - self.redirect_uri = e.headers.get('location', - e.headers.get('Location')) - if self.redirect_uri: - self.redirect_uri = self.redirect_uri.decode('utf-8') - self.ws = None - self._sleep_cycle() - except websocket.WebSocketException as e: - LOG.debug('%s: WebSocketException: %s', self.__class__.__name__, e) - self.ws = None - # Status = Error - utils.settings(self.status_setting, - value=utils.lang(257)) - self._sleep_cycle() - except Exception as e: - LOG.error('%s: Unknown exception encountered when ' - 'connecting: %s', self.__class__.__name__, e) - import traceback - LOG.error("%s: Traceback:\n%s", - self.__class__.__name__, traceback.format_exc()) - self.ws = None - # Status = Error - utils.settings(self.status_setting, - value=utils.lang(257)) - self._sleep_cycle() - else: - self.sleeptime = 0.0 - # Status = Connected - utils.settings(self.status_setting, - value=utils.lang(13296)) - except Exception as e: - LOG.error("%s: Unknown exception encountered: %s", - self.__class__.__name__, e) - import traceback - LOG.error("%s: Traceback:\n%s", - self.__class__.__name__, traceback.format_exc()) - self.close_websocket() - # Status = Error - utils.settings(self.status_setting, value=utils.lang(257)) + if not self._suspended: + # because wait_while_suspended will return instantly if + # this thread did not get suspended from another thread + self.sleep_cycle() + self.url = self.get_uri() + if not self.url: + self.sleep_cycle() + continue + self.run_forever() -class PMS_Websocket(WebSocket): - """ - Websocket connection with the PMS for Plex Companion - """ - status_setting = 'pms_websocket_status' +class PMSWebsocketApp(PlexWebSocketApp): + name = 'pms_websocket' + + def get_uri(self): + return get_pms_uri() def should_suspend(self): """ - Returns True if the thread is suspended. + Returns True if the thread needs to suspend. """ - suspend = self._suspended or app.SYNC.background_sync_disabled - if suspend: - # This thread needs to clear the Event() _is_not_suspended itself! - self.suspend() - return suspend + return (self._suspended or + utils.settings('enableBackgroundSync') != 'true') - def getUri(self): - if self.redirect_uri: - uri = self.redirect_uri - self.redirect_uri = None + def set_suspension_settings_status(self): + if utils.settings('enableBackgroundSync') != 'true': + # Status = Disabled + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(24023)) else: - server = app.CONN.server - # Get the appropriate prefix for the websocket - if server.startswith('https'): - server = "wss%s" % server[5:] - else: - server = "ws%s" % server[4:] - uri = "%s/:/websockets/notifications" % server - if app.ACCOUNT.pms_token: - uri += '?X-Plex-Token=%s' % app.ACCOUNT.pms_token - sslopt = {} - LOG.debug("%s: Uri: %s, sslopt: %s", - self.__class__.__name__, uri, sslopt) - return uri, sslopt - - def process(self, opcode, message): - if opcode not in self.opcode_data: - return - - try: - message = loads(message) - except ValueError as err: - LOG.error('%s: Error decoding message from websocket: %s', - self.__class__.__name__, err) - LOG.error(message) - return - try: - message = message['NotificationContainer'] - except KeyError: - LOG.error('%s: Could not parse PMS message: %s', - self.__class__.__name__, message) - return - # Triage - typus = message.get('type') - if typus is None: - LOG.error('%s: No message type, dropping message: %s', - self.__class__.__name__, message) - return - LOG.debug('%s: Received message from PMS server: %s', - self.__class__.__name__, message) - # Drop everything we're not interested in - if typus not in ('playing', 'timeline', 'activity'): - return - else: - # Put PMS message on queue and let libsync take care of it - app.APP.websocket_queue.put(message) + # Status = 'Suspended - not connected' + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(39093)) -class Alexa_Websocket(WebSocket): - """ - Websocket connection to talk to Amazon Alexa. - """ - status_setting = 'alexa_websocket_status' +class AlexaWebsocketApp(PlexWebSocketApp): + name = 'alexa_websocket' + + def get_uri(self): + return get_alexa_uri() def should_suspend(self): """ - Overwrite method since we need to check for plex token + Returns True if the thread needs to suspend. """ - suspend = self._suspended or \ - not app.SYNC.enable_alexa or \ + return self._suspended or \ + utils.settings('enable_alexa') != 'true' or \ app.ACCOUNT.restricted_user or \ not app.ACCOUNT.plex_token - if suspend: - # This thread needs to clear the Event() _is_not_suspended itself! - self.suspend() - return suspend - def getUri(self): - if self.redirect_uri: - uri = self.redirect_uri - self.redirect_uri = None + def set_suspension_settings_status(self): + if utils.settings('enable_alexa') != 'true': + # Status = Disabled + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(24023)) + elif app.ACCOUNT.restricted_user: + # Status = Managed Plex User - not connected + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(39094)) + elif not app.ACCOUNT.plex_token: + # Status = Not logged in to plex.tv + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(39226)) else: - uri = ('wss://pubsub.plex.tv/sub/websockets/%s/%s?X-Plex-Token=%s' - % (app.ACCOUNT.plex_user_id, - v.PKC_MACHINE_IDENTIFIER, - app.ACCOUNT.plex_token)) - sslopt = {} - LOG.debug("%s: Uri: %s, sslopt: %s", - self.__class__.__name__, uri, sslopt) - return uri, sslopt + # Status = 'Suspended - not connected' + utils.settings(self.name + SETTINGS_STRING, + value=utils.lang(39093)) - def process(self, opcode, message): - if opcode not in self.opcode_data: - return - LOG.debug('%s: Received the following message from Alexa:', - self.__class__.__name__) - LOG.debug('%s: %s', self.__class__.__name__, message) - try: - message = utils.etree.fromstring(message) - except Exception as ex: - LOG.error('%s: Error decoding message from Alexa: %s', - self.__class__.__name__, ex) - return - try: - if message.attrib['command'] == 'processRemoteControlCommand': - message = message[0] - else: - LOG.error('%s: Unknown Alexa message received', - self.__class__.__name__) - return - except Exception: - LOG.error('%s: Could not parse Alexa message', - self.__class__.__name__) - return - companion.process_command(message.attrib['path'][1:], message.attrib) + +def get_pms_websocketapp(): + return PMSWebsocketApp(on_open=on_open, + on_message=pms_on_message, + on_error=on_error, + on_close=on_close) + + +def get_alexa_websocketapp(): + return AlexaWebsocketApp(on_open=on_open, + on_message=alexa_on_message, + on_error=on_error, + on_close=on_close)