Adapt websocket client logic

This commit is contained in:
croneter 2021-03-17 21:13:11 +01:00
parent 3d139b0929
commit 1c4b15e357
6 changed files with 254 additions and 1161 deletions

View file

@ -1134,11 +1134,6 @@ msgctxt "#39089"
msgid "Alexa connection status:"
msgstr ""
# PKC Settings - Connection - Background sync connection status
msgctxt "#39090"
msgid "Suspended - not connected"
msgstr ""
# PKC Settings - Connection - Background sync connection status
msgctxt "#39091"
msgid "Timeout - not connected"
@ -1149,6 +1144,16 @@ msgctxt "#39092"
msgid "IOError - not connected"
msgstr ""
# PKC Settings - Connection - Background sync connection status
msgctxt "#39093"
msgid "Suspended - not connected"
msgstr ""
# PKC Settings - Connection - Background sync connection status
msgctxt "#39094"
msgid "Managed Plex User - not connected"
msgstr ""
msgctxt "#39200"
msgid "Log-out Plex Home User "
msgstr ""

View file

@ -57,8 +57,6 @@ class Sync(object):
# How often shall we sync?
self.full_sync_intervall = None
# Background Sync disabled?
self.background_sync_disabled = None
# How long shall we wait with synching a new item to make sure Plex got all
# metadata?
self.backgroundsync_saftymargin = None
@ -81,7 +79,6 @@ class Sync(object):
# List of section_ids we're synching to Kodi - will be automatically
# re-built if sections are set a-new
self.section_ids = set()
self.enable_alexa = None
self.load()
@ -122,8 +119,6 @@ class Sync(object):
Any settings unrelated to syncs to the Kodi database - can thus be
safely reset without a Kodi reboot
"""
self.background_sync_disabled = utils.settings('enableBackgroundSync') == 'false'
self.enable_alexa = utils.settings('enable_alexa') == 'true'
self.sync_dialog = utils.settings('dbSyncIndicator') == 'true'
self.full_sync_intervall = int(utils.settings('fullSyncInterval')) * 60
self.backgroundsync_saftymargin = int(utils.settings('backgroundsync_saftyMargin'))

View file

@ -98,7 +98,8 @@ class Service(object):
self.welcome_msg = True
self.connection_check_counter = 0
self.setup = None
self.alexa = None
self.pms_ws = None
self.alexa_ws = None
self.playqueue = None
# Flags for other threads
self.connection_check_running = False
@ -446,8 +447,8 @@ class Service(object):
self.setup.setup()
# Initialize important threads
self.ws = websocket_client.PMS_Websocket()
self.alexa = websocket_client.Alexa_Websocket()
self.pms_ws = websocket_client.get_pms_websocketapp()
self.alexa_ws = websocket_client.get_alexa_websocketapp()
self.sync = sync.Sync()
self.plexcompanion = plex_companion.PlexCompanion()
self.playqueue = playqueue.PlayqueueMonitor()
@ -547,11 +548,11 @@ class Service(object):
continue
elif not self.startup_completed:
self.startup_completed = True
self.ws.start()
self.pms_ws.start()
self.sync.start()
self.plexcompanion.start()
self.playqueue.start()
self.alexa.start()
self.alexa_ws.start()
elif app.APP.is_playing:
skip_plex_intro.check()

View file

@ -227,7 +227,7 @@ class Sync(backgroundthread.KillableThread):
not app.APP.is_playing_video):
LOG.info('Doing scheduled full library scan')
self.start_library_sync()
elif not app.SYNC.background_sync_disabled:
else:
# Check back whether we should process something Only do
# this once a while (otherwise, potentially many screen
# refreshes lead to flickering)

View file

@ -1,917 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
websocket - WebSocket client library for Python
Copyright (C) 2010 Hiroki Ohtani(liris)
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
import socket
try:
import ssl
from ssl import SSLError
HAVE_SSL = True
except ImportError:
class SSLError(Exception):
"""
Dummy class of SSLError for ssl none-support environment.
"""
pass
HAVE_SSL = False
from urlparse import urlparse
import os
import array
import struct
import uuid
import hashlib
import base64
import threading
import logging
import traceback
import sys
from . import utils, app
###############################################################################
LOG = logging.getLogger('PLEX.websocket')
###############################################################################
"""
websocket python client.
=========================
This version support only hybi-13.
Please see http://tools.ietf.org/html/rfc6455 for protocol.
"""
# websocket supported version.
VERSION = 13
# closing frame status codes.
STATUS_NORMAL = 1000
STATUS_GOING_AWAY = 1001
STATUS_PROTOCOL_ERROR = 1002
STATUS_UNSUPPORTED_DATA_TYPE = 1003
STATUS_STATUS_NOT_AVAILABLE = 1005
STATUS_ABNORMAL_CLOSED = 1006
STATUS_INVALID_PAYLOAD = 1007
STATUS_POLICY_VIOLATION = 1008
STATUS_MESSAGE_TOO_BIG = 1009
STATUS_INVALID_EXTENSION = 1010
STATUS_UNEXPECTED_CONDITION = 1011
STATUS_TLS_HANDSHAKE_ERROR = 1015
class WebSocketException(Exception):
"""
websocket exeception class.
"""
pass
class WebSocketConnectionClosedException(WebSocketException):
"""
If remote host closed the connection or some network error happened,
this exception will be raised.
"""
pass
class WebSocketTimeoutException(WebSocketException):
"""
WebSocketTimeoutException will be raised at socket timeout during read and
write data.
"""
pass
class WebsocketRedirect(WebSocketException):
"""
WebsocketRedirect will be raised if a status code 301 is returned
The Exception will be instantiated with a dict containing all response
headers; which should contain the redirect address under the key 'location'
Access the headers via the attribute headers
"""
def __init__(self, headers):
self.headers = headers
super(WebsocketRedirect, self).__init__()
DEFAULT_TIMEOUT = None
TRACE_ENABLED = False
def enable_trace(tracable):
"""
turn on/off the tracability.
tracable: boolean value. if set True, tracability is enabled.
"""
global TRACE_ENABLED
TRACE_ENABLED = tracable
if tracable:
if not LOG.handlers:
LOG.addHandler(logging.StreamHandler())
LOG.setLevel(logging.DEBUG)
def setdefaulttimeout(timeout):
"""
Set the global timeout setting to connect.
timeout: default socket timeout time. This value is second.
"""
global DEFAULT_TIMEOUT
DEFAULT_TIMEOUT = timeout
def getdefaulttimeout():
"""
Return the global timeout setting(second) to connect.
"""
return DEFAULT_TIMEOUT
def _parse_url(url):
"""
parse url and the result is tuple of
(hostname, port, resource path and the flag of secure mode)
url: url string.
"""
if ":" not in url:
raise ValueError("url is invalid")
scheme, url = url.split(":", 1)
parsed = urlparse(url, scheme="http")
if parsed.hostname:
hostname = parsed.hostname
else:
raise ValueError("hostname is invalid")
port = 0
if parsed.port:
port = parsed.port
is_secure = False
if scheme == "ws" or scheme == 'http':
if not port:
port = 80
elif scheme == "wss" or scheme == 'https':
is_secure = True
if not port:
port = 443
else:
raise ValueError("scheme %s is invalid" % scheme)
if parsed.path:
resource = parsed.path
else:
resource = "/"
if parsed.query:
resource += "?" + parsed.query
return (hostname, port, resource, is_secure)
def create_connection(url, timeout=None, **options):
"""
connect to url and return websocket object.
Connect to url and return the WebSocket object.
Passing optional timeout parameter will set the timeout on the socket.
If no timeout is supplied, the global default timeout setting returned by
getdefauttimeout() is used.
You can customize using 'options'.
If you set "header" list object, you can set your own custom header.
>>> conn = create_connection("ws://echo.websocket.org/",
... header=["User-Agent: MyProgram",
... "x-custom: header"])
timeout: socket timeout time. This value is integer.
if you set None for this value, it means "use DEFAULT_TIMEOUT
value"
options: current support option is only "header".
if you set header as dict value, the custom HTTP headers are added
"""
sockopt = options.get("sockopt", [])
sslopt = options.get("sslopt", {})
websock = WebSocket(sockopt=sockopt, sslopt=sslopt)
websock.settimeout(timeout if timeout is not None else DEFAULT_TIMEOUT)
websock.connect(url, **options)
return websock
_MAX_INTEGER = (1 << 32) - 1
_AVAILABLE_KEY_CHARS = range(0x21, 0x2f + 1) + range(0x3a, 0x7e + 1)
_MAX_CHAR_BYTE = (1 << 8) - 1
# ref. Websocket gets an update, and it breaks stuff.
# http://axod.blogspot.com/2010/06/websocket-gets-update-and-it-breaks.html
def _create_sec_websocket_key():
uid = uuid.uuid4()
return base64.encodestring(uid.bytes).strip()
_HEADERS_TO_CHECK = {"upgrade": "websocket", "connection": "upgrade"}
class ABNF(object):
"""
ABNF frame class.
see http://tools.ietf.org/html/rfc5234
and http://tools.ietf.org/html/rfc6455#section-5.2
"""
# operation code values.
OPCODE_CONT = 0x0
OPCODE_TEXT = 0x1
OPCODE_BINARY = 0x2
OPCODE_CLOSE = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xa
# available operation code value tuple
OPCODES = (OPCODE_CONT, OPCODE_TEXT, OPCODE_BINARY, OPCODE_CLOSE,
OPCODE_PING, OPCODE_PONG)
# opcode human readable string
OPCODE_MAP = {
OPCODE_CONT: "cont",
OPCODE_TEXT: "text",
OPCODE_BINARY: "binary",
OPCODE_CLOSE: "close",
OPCODE_PING: "ping",
OPCODE_PONG: "pong"
}
# data length threashold.
LENGTH_7 = 0x7d
LENGTH_16 = 1 << 16
LENGTH_63 = 1 << 63
def __init__(self, fin=0, rsv1=0, rsv2=0, rsv3=0,
opcode=OPCODE_TEXT, mask=1, data=""):
"""
Constructor for ABNF.
please check RFC for arguments.
"""
self.fin = fin
self.rsv1 = rsv1
self.rsv2 = rsv2
self.rsv3 = rsv3
self.opcode = opcode
self.mask = mask
self.data = data
self.get_mask_key = os.urandom
def __str__(self):
return "fin=" + str(self.fin) \
+ " opcode=" + str(self.opcode) \
+ " data=" + str(self.data)
@staticmethod
def create_frame(data, opcode):
"""
create frame to send text, binary and other data.
data: data to send. This is string value(byte array).
if opcode is OPCODE_TEXT and this value is uniocde,
data value is conveted into unicode string, automatically.
opcode: operation code. please see OPCODE_XXX.
"""
if opcode == ABNF.OPCODE_TEXT and isinstance(data, unicode):
data = utils.try_encode(data)
# mask must be set if send data from client
return ABNF(1, 0, 0, 0, opcode, 1, data)
def format(self):
"""
format this object to string(byte array) to send data to server.
"""
if any(x not in (0, 1) for x in [self.fin, self.rsv1, self.rsv2, self.rsv3]):
raise ValueError("not 0 or 1")
if self.opcode not in ABNF.OPCODES:
raise ValueError("Invalid OPCODE")
length = len(self.data)
if length >= ABNF.LENGTH_63:
raise ValueError("data is too long")
frame_header = chr(self.fin << 7 |
self.rsv1 << 6 | self.rsv2 << 5 | self.rsv3 << 4 |
self.opcode)
if length < ABNF.LENGTH_7:
frame_header += chr(self.mask << 7 | length)
elif length < ABNF.LENGTH_16:
frame_header += chr(self.mask << 7 | 0x7e)
frame_header += struct.pack("!H", length)
else:
frame_header += chr(self.mask << 7 | 0x7f)
frame_header += struct.pack("!Q", length)
if not self.mask:
return frame_header + self.data
else:
mask_key = self.get_mask_key(4)
return frame_header + self._get_masked(mask_key)
def _get_masked(self, mask_key):
s = ABNF.mask(mask_key, self.data)
return mask_key + "".join(s)
@staticmethod
def mask(mask_key, data):
"""
mask or unmask data. Just do xor for each byte
mask_key: 4 byte string(byte).
data: data to mask/unmask.
"""
_m = array.array("B", mask_key)
_d = array.array("B", data)
for i in xrange(len(_d)):
_d[i] ^= _m[i % 4]
return _d.tostring()
class WebSocket(object):
"""
Low level WebSocket interface.
This class is based on
The WebSocket protocol draft-hixie-thewebsocketprotocol-76
http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76
We can connect to the websocket server and send/recieve data.
The following example is a echo client.
>>> import websocket
>>> ws = websocket.WebSocket()
>>> ws.connect("ws://echo.websocket.org")
>>> ws.send("Hello, Server")
>>> ws.recv()
'Hello, Server'
>>> ws.close()
get_mask_key: a callable to produce new mask keys, see the set_mask_key
function's docstring for more details
sockopt: values for socket.setsockopt.
sockopt must be tuple and each element is argument of sock.setscokopt.
sslopt: dict object for ssl socket option.
"""
def __init__(self, get_mask_key=None, sockopt=None, sslopt=None):
"""
Initalize WebSocket object.
"""
if sockopt is None:
sockopt = []
if sslopt is None:
sslopt = {}
self.connected = False
self.sock = socket.socket()
for opts in sockopt:
self.sock.setsockopt(*opts)
self.sslopt = sslopt
self.get_mask_key = get_mask_key
# Buffers over the packets from the layer beneath until desired amount
# bytes of bytes are received.
self._recv_buffer = []
# These buffer over the build-up of a single frame.
self._frame_header = None
self._frame_length = None
self._frame_mask = None
self._cont_data = None
def fileno(self):
"""
Returns sock.fileno()
"""
return self.sock.fileno()
def set_mask_key(self, func):
"""
set function to create musk key. You can custumize mask key generator.
Mainly, this is for testing purpose.
func: callable object. the fuct must 1 argument as integer.
The argument means length of mask key.
This func must be return string(byte array),
which length is argument specified.
"""
self.get_mask_key = func
def gettimeout(self):
"""
Get the websocket timeout(second).
"""
return self.sock.gettimeout()
def settimeout(self, timeout):
"""
Set the timeout to the websocket.
timeout: timeout time(second).
"""
self.sock.settimeout(timeout)
timeout = property(gettimeout, settimeout)
def connect(self, url, **options):
"""
Connect to url. url is websocket url scheme. ie. ws://host:port/resource
You can customize using 'options'.
If you set "header" dict object, you can set your own custom header.
>>> ws = WebSocket()
>>> ws.connect("ws://echo.websocket.org/",
... header={"User-Agent: MyProgram",
... "x-custom: header"})
timeout: socket timeout time. This value is integer.
if you set None for this value,
it means "use DEFAULT_TIMEOUT value"
options: current support option is only "header".
if you set header as dict value,
the custom HTTP headers are added.
"""
hostname, port, resource, is_secure = _parse_url(url)
# TODO: we need to support proxy
self.sock.connect((hostname, port))
if is_secure:
if HAVE_SSL:
if self.sslopt is None:
sslopt = {}
else:
sslopt = self.sslopt
self.sock = ssl.wrap_socket(self.sock, **sslopt)
else:
raise WebSocketException("SSL not available.")
self._handshake(hostname, port, resource, **options)
def _handshake(self, host, port, resource, **options):
headers = []
headers.append("GET %s HTTP/1.1" % resource)
headers.append("Upgrade: websocket")
headers.append("Connection: Upgrade")
if port == 80:
hostport = host
else:
hostport = "%s:%d" % (host, port)
headers.append("Host: %s" % hostport)
if "origin" in options:
headers.append("Origin: %s" % options["origin"])
else:
headers.append("Origin: http://%s" % hostport)
key = _create_sec_websocket_key()
headers.append("Sec-WebSocket-Key: %s" % key)
headers.append("Sec-WebSocket-Version: %s" % VERSION)
if "header" in options:
headers.extend(options["header"])
headers.append("")
headers.append("")
header_str = "\r\n".join(headers)
self._send(header_str)
if TRACE_ENABLED:
LOG.debug("--- request header ---")
LOG.debug(header_str)
LOG.debug("-----------------------")
status, resp_headers = self._read_headers()
if status == 301:
# Redirect
raise WebsocketRedirect(resp_headers)
if status != 101:
self.close()
raise WebSocketException("Handshake Status %d" % status)
success = self._validate_header(resp_headers, key)
if not success:
self.close()
raise WebSocketException("Invalid WebSocket Header")
self.connected = True
@staticmethod
def _validate_header(headers, key):
for k, v in _HEADERS_TO_CHECK.iteritems():
r = headers.get(k, None)
if not r:
return False
r = r.lower()
if v != r:
return False
result = headers.get("sec-websocket-accept", None)
if not result:
return False
result = result.lower()
value = key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
hashed = base64.encodestring(hashlib.sha1(value).digest()).strip().lower()
return hashed == result
def _read_headers(self):
status = None
headers = {}
if TRACE_ENABLED:
LOG.debug("--- response header ---")
while True:
line = self._recv_line()
if line == "\r\n":
break
line = line.strip()
if TRACE_ENABLED:
LOG.debug(line)
if not status:
status_info = line.split(" ", 2)
status = int(status_info[1])
else:
kv = line.split(":", 1)
if len(kv) == 2:
key, value = kv
headers[key.lower()] = value.strip().lower()
else:
raise WebSocketException("Invalid header")
if TRACE_ENABLED:
LOG.debug("-----------------------")
return status, headers
def send(self, payload, opcode=ABNF.OPCODE_TEXT):
"""
Send the data as string.
payload: Payload must be utf-8 string or unicoce,
if the opcode is OPCODE_TEXT.
Otherwise, it must be string(byte array)
opcode: operation code to send. Please see OPCODE_XXX.
"""
frame = ABNF.create_frame(payload, opcode)
if self.get_mask_key:
frame.get_mask_key = self.get_mask_key
data = frame.format()
length = len(data)
if TRACE_ENABLED:
LOG.debug("send: %s", repr(data))
while data:
l = self._send(data)
data = data[l:]
return length
def send_binary(self, payload):
"""
send the payload
"""
return self.send(payload, ABNF.OPCODE_BINARY)
def ping(self, payload=""):
"""
send ping data.
payload: data payload to send server.
"""
self.send(payload, ABNF.OPCODE_PING)
def pong(self, payload):
"""
send pong data.
payload: data payload to send server.
"""
self.send(payload, ABNF.OPCODE_PONG)
def recv(self):
"""
Receive string data(byte array) from the server.
return value: string(byte array) value.
"""
_, data = self.recv_data()
return data
def recv_data(self):
"""
Recieve data with operation code.
return value: tuple of operation code and string(byte array) value.
"""
while True:
frame = self.recv_frame()
if not frame:
# handle error:
# 'NoneType' object has no attribute 'opcode'
raise WebSocketException("Not a valid frame %s" % frame)
elif frame.opcode in (ABNF.OPCODE_TEXT, ABNF.OPCODE_BINARY, ABNF.OPCODE_CONT):
if frame.opcode == ABNF.OPCODE_CONT and not self._cont_data:
raise WebSocketException("Illegal frame")
if self._cont_data:
self._cont_data[1] += frame.data
else:
self._cont_data = [frame.opcode, frame.data]
if frame.fin:
data = self._cont_data
self._cont_data = None
return data
elif frame.opcode == ABNF.OPCODE_CLOSE:
self.send_close()
return (frame.opcode, None)
elif frame.opcode == ABNF.OPCODE_PING:
self.pong(frame.data)
def recv_frame(self):
"""
recieve data as frame from server.
return value: ABNF frame object.
"""
# Header
if self._frame_header is None:
self._frame_header = self._recv_strict(2)
b1 = ord(self._frame_header[0])
fin = b1 >> 7 & 1
rsv1 = b1 >> 6 & 1
rsv2 = b1 >> 5 & 1
rsv3 = b1 >> 4 & 1
opcode = b1 & 0xf
b2 = ord(self._frame_header[1])
has_mask = b2 >> 7 & 1
# Frame length
if self._frame_length is None:
length_bits = b2 & 0x7f
if length_bits == 0x7e:
length_data = self._recv_strict(2)
self._frame_length = struct.unpack("!H", length_data)[0]
elif length_bits == 0x7f:
length_data = self._recv_strict(8)
self._frame_length = struct.unpack("!Q", length_data)[0]
else:
self._frame_length = length_bits
# Mask
if self._frame_mask is None:
self._frame_mask = self._recv_strict(4) if has_mask else ""
# Payload
payload = self._recv_strict(self._frame_length)
if has_mask:
payload = ABNF.mask(self._frame_mask, payload)
# Reset for next frame
self._frame_header = None
self._frame_length = None
self._frame_mask = None
return ABNF(fin, rsv1, rsv2, rsv3, opcode, has_mask, payload)
def send_close(self, status=STATUS_NORMAL, reason=""):
"""
send close data to the server.
status: status code to send. see STATUS_XXX.
reason: the reason to close. This must be string.
"""
if status < 0 or status >= ABNF.LENGTH_16:
raise ValueError("code is invalid range")
self.send(struct.pack('!H', status) + reason, ABNF.OPCODE_CLOSE)
def close(self, status=STATUS_NORMAL, reason=""):
"""
Close Websocket object
status: status code to send. see STATUS_XXX.
reason: the reason to close. This must be string.
"""
try:
self.sock.shutdown(socket.SHUT_RDWR)
except Exception:
pass
self._closeInternal()
def _closeInternal(self):
self.connected = False
self.sock.close()
def _send(self, data):
try:
return self.sock.send(data)
except socket.timeout as e:
raise WebSocketTimeoutException(e.args[0])
except Exception as e:
if "timed out" in e.args[0]:
raise WebSocketTimeoutException(e.args[0])
else:
raise e
def _recv(self, bufsize):
try:
bytes_ = self.sock.recv(bufsize)
except socket.timeout as e:
raise WebSocketTimeoutException(e.args[0])
except SSLError as e:
if e.args[0] == "The read operation timed out":
raise WebSocketTimeoutException(e.args[0])
else:
raise
if not bytes_:
raise WebSocketConnectionClosedException()
return bytes_
def _recv_strict(self, bufsize):
shortage = bufsize - sum(len(x) for x in self._recv_buffer)
while shortage > 0:
bytes_ = self._recv(shortage)
self._recv_buffer.append(bytes_)
shortage -= len(bytes_)
unified = "".join(self._recv_buffer)
if shortage == 0:
self._recv_buffer = []
return unified
else:
self._recv_buffer = [unified[bufsize:]]
return unified[:bufsize]
def _recv_line(self):
line = []
while True:
c = self._recv(1)
line.append(c)
if c == "\n":
break
return "".join(line)
class WebSocketApp(object):
"""
Higher level of APIs are provided.
The interface is like JavaScript WebSocket object.
"""
def __init__(self, url, header=None,
on_open=None, on_message=None, on_error=None,
on_close=None, keep_running=True, get_mask_key=None):
"""
url: websocket url.
header: custom header for websocket handshake.
on_open: callable object which is called at opening websocket.
this function has one argument. The arugment is this class object.
on_message: callbale object which is called when recieved data.
on_message has 2 arguments.
The 1st arugment is this class object.
The passing 2nd arugment is utf-8 string which we get from the server.
on_error: callable object which is called when we get error.
on_error has 2 arguments.
The 1st arugment is this class object.
The passing 2nd arugment is exception object.
on_close: callable object which is called when closed the connection.
this function has one argument. The arugment is this class object.
keep_running: a boolean flag indicating whether the app's main loop should
keep running, defaults to True
get_mask_key: a callable to produce new mask keys, see the WebSocket.set_mask_key's
docstring for more information
"""
self.url = url
self.header = [] if header is None else header
self.on_open = on_open
self.on_message = on_message
self.on_error = on_error
self.on_close = on_close
self.keep_running = keep_running
self.get_mask_key = get_mask_key
self.sock = None
def send(self, data, opcode=ABNF.OPCODE_TEXT):
"""
send message.
data: message to send. If you set opcode to OPCODE_TEXT, data must be utf-8 string or unicode.
opcode: operation code of data. default is OPCODE_TEXT.
"""
if self.sock.send(data, opcode) == 0:
raise WebSocketConnectionClosedException()
def close(self):
"""
close websocket connection.
"""
self.keep_running = False
if self.sock != None:
self.sock.close()
def _send_ping(self, interval):
while True:
for _ in range(interval):
app.APP.monitor.waitForAbort(1)
if not self.keep_running:
return
self.sock.ping()
def run_forever(self, sockopt=None, sslopt=None, ping_interval=0):
"""
run event loop for WebSocket framework.
This loop is infinite loop and is alive during websocket is available.
sockopt: values for socket.setsockopt.
sockopt must be tuple and each element is argument of
sock.setscokopt.
sslopt: ssl socket optional dict.
ping_interval: automatically send "ping" command every specified
period(second)
if set to 0, not send automatically.
"""
if sockopt is None:
sockopt = []
if sslopt is None:
sslopt = {}
if self.sock:
raise WebSocketException("socket is already opened")
thread = None
self.keep_running = True
try:
self.sock = WebSocket(self.get_mask_key,
sockopt=sockopt,
sslopt=sslopt)
self.sock.settimeout(DEFAULT_TIMEOUT)
self.sock.connect(self.url, header=self.header)
self._callback(self.on_open)
if ping_interval:
thread = threading.Thread(target=self._send_ping,
args=(ping_interval,))
thread.setDaemon(True)
thread.start()
while self.keep_running:
try:
data = self.sock.recv()
if data is None or self.keep_running is False:
break
self._callback(self.on_message, data)
except Exception, e:
if "timed out" not in e.args[0]:
raise e
except Exception, e:
self._callback(self.on_error, e)
finally:
if thread:
self.keep_running = False
self.sock.close()
self._callback(self.on_close)
self.sock = None
def _callback(self, callback, *args):
if callback:
try:
callback(self, *args)
except Exception, e:
LOG.error(e)
_, _, tb = sys.exc_info()
traceback.print_tb(tb)
if __name__ == "__main__":
enable_trace(True)
WEBSOCKET = create_connection("ws://echo.websocket.org/")
LOG.info("Sending 'Hello, World'...")
WEBSOCKET.send("Hello, World")
LOG.info("Sent")
LOG.info("Receiving...")
RESULT = WEBSOCKET.recv()
LOG.info("Received '%s'", RESULT)
WEBSOCKET.close()

View file

@ -1,58 +1,160 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
from json import loads
from ssl import CERT_NONE
import json
from . import backgroundthread, websocket, utils, companion, app, variables as v
from . import websocket
from . import backgroundthread, app, variables as v, utils, companion
###############################################################################
log = getLogger('PLEX.websocket')
LOG = getLogger('PLEX.websocket_client')
PMS_PATH = '/:/websockets/notifications'
###############################################################################
PMS_INTERESTING_MESSAGE_TYPES = ('playing', 'timeline', 'activity')
SETTINGS_STRING = '_status'
class WebSocket(backgroundthread.KillableThread):
opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
status_setting = None
def get_pms_uri():
uri = app.CONN.server
if not uri:
return
# Get the appropriate prefix for the websocket
if uri.startswith('https'):
uri = "wss%s" % uri[5:]
else:
uri = "ws%s" % uri[4:]
uri += PMS_PATH
log.debug('uri to connect pms websocket: %s', uri)
if app.ACCOUNT.pms_token:
uri += '?X-Plex-Token=' + app.ACCOUNT.pms_token
return uri
def __init__(self):
self.ws = None
self.redirect_uri = None
self.sleeptime = 0.0
super(WebSocket, self).__init__()
def close_websocket(self):
if self.ws is not None:
self.ws.close()
self.ws = None
def get_alexa_uri():
if not app.ACCOUNT.plex_token:
return
return ('wss://pubsub.plex.tv/sub/websockets/%s/%s?X-Plex-Token=%s'
% (app.ACCOUNT.plex_user_id,
v.PKC_MACHINE_IDENTIFIER,
app.ACCOUNT.plex_token))
def process(self, opcode, message):
raise NotImplementedError
def receive(self, ws):
# Not connected yet
if ws is None:
raise websocket.WebSocketConnectionClosedException
def pms_on_message(ws, message):
"""
Called when we receive a message from the PMS, e.g. when a new library
item has been added.
"""
try:
message = json.loads(message)
except ValueError as err:
log.error('Error decoding PMS websocket message: %s', err)
log.error('message: %s', message)
return
try:
message = message['NotificationContainer']
typus = message['type']
except KeyError:
log.error('Could not parse PMS message: %s', message)
return
# Triage
if typus not in PMS_INTERESTING_MESSAGE_TYPES:
# Drop everything we're not interested in
return
else:
# Put PMS message on queue and let libsync take care of it
app.APP.websocket_queue.put(message)
frame = ws.recv_frame()
if not frame:
raise websocket.WebSocketException("Not a valid frame %s" % frame)
elif frame.opcode in self.opcode_data:
return frame.opcode, frame.data
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
ws.send_close()
return frame.opcode, None
elif frame.opcode == websocket.ABNF.OPCODE_PING:
ws.pong("Hi!")
return None, None
def alexa_on_message(ws, message):
"""
Called when we receive a message from Alexa
"""
log.debug('alexa message received: %s', message)
try:
message = utils.etree.fromstring(message)
except Exception as err:
log.error('Error decoding message from Alexa: %s %s', type(err), err)
log.error('message from Alexa: %s', message)
return
try:
if message.attrib['command'] == 'processRemoteControlCommand':
message = message[0]
else:
log.error('Unknown Alexa message received: %s', message)
return
companion.process_command(message.attrib['path'][1:], message.attrib)
except Exception as err:
log.exception('Could not parse Alexa message, error: %s %s',
type(err), err)
log.error('message: %s', message)
def getUri(self):
raise NotImplementedError
def _sleep_cycle(self):
def on_error(ws, error):
status = ws.name + SETTINGS_STRING
if isinstance(error, IOError):
# We are probably offline
log.debug('%s: IOError connecting', ws.name)
# Status = IOError - not connected
utils.settings(status, value=utils.lang(39092))
ws.sleep_cycle()
elif isinstance(error, websocket.WebSocketTimeoutException):
log.debug('%s: WebSocketTimeoutException', ws.name)
# Status = 'Timeout - not connected'
utils.settings(status, value=utils.lang(39091))
ws.sleep_cycle()
elif isinstance(error, websocket.WebSocketConnectionClosedException):
log.debug('%s: WebSocketConnectionClosedException', ws.name)
# Status = Not connected
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208))
elif isinstance(error, websocket.WebSocketBadStatusException):
# Most likely Alexa not connecting, throwing a 403
log.debug('%s: got a bad HTTP status: %s', ws.name, error)
# Status = <value of exception>
utils.settings(status, value=str(error))
ws.sleep_cycle()
elif isinstance(error, websocket.WebSocketException):
log.error('%s: got another websocket exception %s: %s',
ws.name, type(error), error)
# Status = Error
utils.settings(status, value=utils.lang(257))
ws.sleep_cycle()
elif isinstance(error, SystemExit):
log.debug('%s: SystemExit detected', ws.name)
# Status = Not connected
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208))
else:
log.exception('%s: got an unexpected exception of type %s: %s',
ws.name, type(error), error)
# Status = Error
utils.settings(status, value=utils.lang(257))
raise RuntimeError
def on_close(ws):
"""
This does not seem to get called by our websocket client :-(
"""
log.debug('%s: connection closed', ws.name)
# Status = Not connected
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208))
def on_open(ws):
log.debug('%s: connected', ws.name)
# Status = Connected
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(13296))
ws.sleeptime = 0
class PlexWebSocketApp(websocket.WebSocketApp,
backgroundthread.KillableThread):
def __init__(self, **kwargs):
self.sleeptime = 0
backgroundthread.KillableThread.__init__(self)
websocket.WebSocketApp.__init__(self, self.get_uri(), **kwargs)
def sleep_cycle(self):
"""
Sleeps for 2^self.sleeptime where sleeping period will be doubled with
each unsuccessful connection attempt.
@ -60,226 +162,133 @@ class WebSocket(backgroundthread.KillableThread):
"""
self.sleep(2 ** self.sleeptime)
if self.sleeptime < 6:
self.sleeptime += 1.0
self.sleeptime += 1
def suspend(self, block=False, timeout=None):
"""
Call this method from another thread to suspend this websocket thread
"""
self.close()
backgroundthread.KillableThread.suspend(self, block, timeout)
def cancel(self):
"""
Call this method from another thread to cancel this websocket thread
"""
self.close()
backgroundthread.KillableThread.cancel(self)
def run(self):
LOG.info("----===## Starting %s ##===----", self.__class__.__name__)
"""
Ensure that sockets will be closed no matter what
"""
log.info("----===## Starting %s ##===----", self.name)
app.APP.register_thread(self)
try:
self._run()
except RuntimeError:
pass
except Exception as err:
log.exception('Exception of type %s occured: %s', type(err), err)
finally:
self.close_websocket()
self.close()
# Status = Not connected
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(15208))
app.APP.deregister_thread(self)
LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__)
log.info("----===## %s stopped ##===----", self.name)
def _run(self):
while not self.should_cancel():
# In the event the server goes offline
if self.should_suspend():
# Set in service.py
self.close_websocket()
# Status = 'Suspended - not connected'
utils.settings(self.status_setting, value=utils.lang(39090))
while self.should_suspend():
# We will be caught in this loop if either another thread
# called the suspend() method, thus setting _suspended = True
# OR if there any other conditions to not open a websocket
# connection - see methods should_suspend() below
# Status = Suspended - not connected
self.set_suspension_settings_status()
if self.wait_while_suspended():
# Abort was requested while waiting. We should exit
return
try:
self.process(*self.receive(self.ws))
except websocket.WebSocketTimeoutException:
# No worries if read timed out
pass
except websocket.WebSocketConnectionClosedException:
LOG.debug("%s: connection closed, (re)connecting",
self.__class__.__name__)
uri, sslopt = self.getUri()
try:
# Low timeout - let's us shut this thread down!
self.ws = websocket.create_connection(
uri,
timeout=1,
sslopt=sslopt,
enable_multithread=True)
except IOError:
# Server is probably offline
LOG.debug("%s: IOError connecting", self.__class__.__name__)
self.ws = None
# Status = IOError - not connected
utils.settings(self.status_setting,
value=utils.lang(39092))
self._sleep_cycle()
except websocket.WebSocketTimeoutException:
LOG.debug("%s: WebSocketTimeoutException", self.__class__.__name__)
self.ws = None
# Status = 'Timeout - not connected'
utils.settings(self.status_setting,
value=utils.lang(39091))
self._sleep_cycle()
except websocket.WebsocketRedirect as e:
LOG.debug('301 redirect detected: %s', e)
self.redirect_uri = e.headers.get('location',
e.headers.get('Location'))
if self.redirect_uri:
self.redirect_uri = self.redirect_uri.decode('utf-8')
self.ws = None
self._sleep_cycle()
except websocket.WebSocketException as e:
LOG.debug('%s: WebSocketException: %s', self.__class__.__name__, e)
self.ws = None
# Status = Error
utils.settings(self.status_setting,
value=utils.lang(257))
self._sleep_cycle()
except Exception as e:
LOG.error('%s: Unknown exception encountered when '
'connecting: %s', self.__class__.__name__, e)
import traceback
LOG.error("%s: Traceback:\n%s",
self.__class__.__name__, traceback.format_exc())
self.ws = None
# Status = Error
utils.settings(self.status_setting,
value=utils.lang(257))
self._sleep_cycle()
else:
self.sleeptime = 0.0
# Status = Connected
utils.settings(self.status_setting,
value=utils.lang(13296))
except Exception as e:
LOG.error("%s: Unknown exception encountered: %s",
self.__class__.__name__, e)
import traceback
LOG.error("%s: Traceback:\n%s",
self.__class__.__name__, traceback.format_exc())
self.close_websocket()
# Status = Error
utils.settings(self.status_setting, value=utils.lang(257))
if not self._suspended:
# because wait_while_suspended will return instantly if
# this thread did not get suspended from another thread
self.sleep_cycle()
self.url = self.get_uri()
if not self.url:
self.sleep_cycle()
continue
self.run_forever()
class PMS_Websocket(WebSocket):
"""
Websocket connection with the PMS for Plex Companion
"""
status_setting = 'pms_websocket_status'
class PMSWebsocketApp(PlexWebSocketApp):
name = 'pms_websocket'
def get_uri(self):
return get_pms_uri()
def should_suspend(self):
"""
Returns True if the thread is suspended.
Returns True if the thread needs to suspend.
"""
suspend = self._suspended or app.SYNC.background_sync_disabled
if suspend:
# This thread needs to clear the Event() _is_not_suspended itself!
self.suspend()
return suspend
return (self._suspended or
utils.settings('enableBackgroundSync') != 'true')
def getUri(self):
if self.redirect_uri:
uri = self.redirect_uri
self.redirect_uri = None
def set_suspension_settings_status(self):
if utils.settings('enableBackgroundSync') != 'true':
# Status = Disabled
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(24023))
else:
server = app.CONN.server
# Get the appropriate prefix for the websocket
if server.startswith('https'):
server = "wss%s" % server[5:]
else:
server = "ws%s" % server[4:]
uri = "%s/:/websockets/notifications" % server
if app.ACCOUNT.pms_token:
uri += '?X-Plex-Token=%s' % app.ACCOUNT.pms_token
sslopt = {}
if v.KODIVERSION == 17 and utils.settings('sslverify') == "false":
sslopt["cert_reqs"] = CERT_NONE
LOG.debug("%s: Uri: %s, sslopt: %s",
self.__class__.__name__, uri, sslopt)
return uri, sslopt
def process(self, opcode, message):
if opcode not in self.opcode_data:
return
try:
message = loads(message)
except ValueError as err:
LOG.error('%s: Error decoding message from websocket: %s',
self.__class__.__name__, err)
LOG.error(message)
return
try:
message = message['NotificationContainer']
except KeyError:
LOG.error('%s: Could not parse PMS message: %s',
self.__class__.__name__, message)
return
# Triage
typus = message.get('type')
if typus is None:
LOG.error('%s: No message type, dropping message: %s',
self.__class__.__name__, message)
return
LOG.debug('%s: Received message from PMS server: %s',
self.__class__.__name__, message)
# Drop everything we're not interested in
if typus not in ('playing', 'timeline', 'activity'):
return
else:
# Put PMS message on queue and let libsync take care of it
app.APP.websocket_queue.put(message)
# Status = 'Suspended - not connected'
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(39093))
class Alexa_Websocket(WebSocket):
"""
Websocket connection to talk to Amazon Alexa.
"""
status_setting = 'alexa_websocket_status'
class AlexaWebsocketApp(PlexWebSocketApp):
name = 'alexa_websocket'
def get_uri(self):
return get_alexa_uri()
def should_suspend(self):
"""
Overwrite method since we need to check for plex token
Returns True if the thread needs to suspend.
"""
suspend = self._suspended or \
not app.SYNC.enable_alexa or \
return self._suspended or \
utils.settings('enable_alexa') != 'true' or \
app.ACCOUNT.restricted_user or \
not app.ACCOUNT.plex_token
if suspend:
# This thread needs to clear the Event() _is_not_suspended itself!
self.suspend()
return suspend
def getUri(self):
if self.redirect_uri:
uri = self.redirect_uri
self.redirect_uri = None
def set_suspension_settings_status(self):
if utils.settings('enable_alexa') != 'true':
# Status = Disabled
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(24023))
elif app.ACCOUNT.restricted_user:
# Status = Managed Plex User - not connected
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(39094))
elif not app.ACCOUNT.plex_token:
# Status = Not logged in to plex.tv
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(39226))
else:
uri = ('wss://pubsub.plex.tv/sub/websockets/%s/%s?X-Plex-Token=%s'
% (app.ACCOUNT.plex_user_id,
v.PKC_MACHINE_IDENTIFIER,
app.ACCOUNT.plex_token))
sslopt = {}
LOG.debug("%s: Uri: %s, sslopt: %s",
self.__class__.__name__, uri, sslopt)
return uri, sslopt
# Status = 'Suspended - not connected'
utils.settings(self.name + SETTINGS_STRING,
value=utils.lang(39093))
def process(self, opcode, message):
if opcode not in self.opcode_data:
return
LOG.debug('%s: Received the following message from Alexa:',
self.__class__.__name__)
LOG.debug('%s: %s', self.__class__.__name__, message)
try:
message = utils.defused_etree.fromstring(message)
except Exception as ex:
LOG.error('%s: Error decoding message from Alexa: %s',
self.__class__.__name__, ex)
return
try:
if message.attrib['command'] == 'processRemoteControlCommand':
message = message[0]
else:
LOG.error('%s: Unknown Alexa message received',
self.__class__.__name__)
return
except Exception:
LOG.error('%s: Could not parse Alexa message',
self.__class__.__name__)
return
companion.process_command(message.attrib['path'][1:], message.attrib)
def get_pms_websocketapp():
return PMSWebsocketApp(on_open=on_open,
on_message=pms_on_message,
on_error=on_error,
on_close=on_close)
def get_alexa_websocketapp():
return AlexaWebsocketApp(on_open=on_open,
on_message=alexa_on_message,
on_error=on_error,
on_close=on_close)