294 lines
10 KiB
Python
294 lines
10 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
from __future__ import absolute_import, division, unicode_literals
|
|
from logging import getLogger
|
|
import json
|
|
|
|
from . import websocket
|
|
from . import backgroundthread, app, variables as v, utils, companion
|
|
|
|
log = getLogger('PLEX.websocket')
|
|
|
|
PMS_PATH = '/:/websockets/notifications'
|
|
|
|
PMS_INTERESTING_MESSAGE_TYPES = ('playing', 'timeline', 'activity')
|
|
SETTINGS_STRING = '_status'
|
|
|
|
|
|
def get_pms_uri():
|
|
uri = app.CONN.server
|
|
if not uri:
|
|
return
|
|
# Get the appropriate prefix for the websocket
|
|
if uri.startswith('https'):
|
|
uri = "wss%s" % uri[5:]
|
|
else:
|
|
uri = "ws%s" % uri[4:]
|
|
uri += PMS_PATH
|
|
log.debug('uri to connect pms websocket: %s', uri)
|
|
if app.ACCOUNT.pms_token:
|
|
uri += '?X-Plex-Token=' + app.ACCOUNT.pms_token
|
|
return uri
|
|
|
|
|
|
def get_alexa_uri():
|
|
if not app.ACCOUNT.plex_token:
|
|
return
|
|
return ('wss://pubsub.plex.tv/sub/websockets/%s/%s?X-Plex-Token=%s'
|
|
% (app.ACCOUNT.plex_user_id,
|
|
v.PKC_MACHINE_IDENTIFIER,
|
|
app.ACCOUNT.plex_token))
|
|
|
|
|
|
def pms_on_message(ws, message):
|
|
"""
|
|
Called when we receive a message from the PMS, e.g. when a new library
|
|
item has been added.
|
|
"""
|
|
try:
|
|
message = json.loads(message)
|
|
except ValueError as err:
|
|
log.error('Error decoding PMS websocket message: %s', err)
|
|
log.error('message: %s', message)
|
|
return
|
|
try:
|
|
message = message['NotificationContainer']
|
|
typus = message['type']
|
|
except KeyError:
|
|
log.error('Could not parse PMS message: %s', message)
|
|
return
|
|
# Triage
|
|
if typus not in PMS_INTERESTING_MESSAGE_TYPES:
|
|
# Drop everything we're not interested in
|
|
return
|
|
else:
|
|
# Put PMS message on queue and let libsync take care of it
|
|
app.APP.websocket_queue.put(message)
|
|
|
|
|
|
def alexa_on_message(ws, message):
|
|
"""
|
|
Called when we receive a message from Alexa
|
|
"""
|
|
log.debug('alexa message received: %s', message)
|
|
try:
|
|
message = utils.etree.fromstring(message)
|
|
except Exception as err:
|
|
log.error('Error decoding message from Alexa: %s %s', type(err), err)
|
|
log.error('message from Alexa: %s', message)
|
|
return
|
|
try:
|
|
if message.attrib['command'] == 'processRemoteControlCommand':
|
|
message = message[0]
|
|
else:
|
|
log.error('Unknown Alexa message received: %s', message)
|
|
return
|
|
companion.process_command(message.attrib['path'][1:], message.attrib)
|
|
except Exception as err:
|
|
log.exception('Could not parse Alexa message, error: %s %s',
|
|
type(err), err)
|
|
log.error('message: %s', message)
|
|
|
|
|
|
def on_error(ws, error):
|
|
status = ws.name + SETTINGS_STRING
|
|
if isinstance(error, IOError):
|
|
# We are probably offline
|
|
log.debug('%s: IOError connecting', ws.name)
|
|
# Status = IOError - not connected
|
|
utils.settings(status, value=utils.lang(39092))
|
|
ws.sleep_cycle()
|
|
elif isinstance(error, websocket.WebSocketTimeoutException):
|
|
log.debug('%s: WebSocketTimeoutException', ws.name)
|
|
# Status = 'Timeout - not connected'
|
|
utils.settings(status, value=utils.lang(39091))
|
|
ws.sleep_cycle()
|
|
elif isinstance(error, websocket.WebSocketConnectionClosedException):
|
|
log.debug('%s: WebSocketConnectionClosedException', ws.name)
|
|
# Status = Not connected
|
|
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208))
|
|
elif isinstance(error, websocket.WebSocketBadStatusException):
|
|
# Most likely Alexa not connecting, throwing a 403
|
|
log.debug('%s: got a bad HTTP status: %s', ws.name, error)
|
|
# Status = <value of exception>
|
|
utils.settings(status, value=str(error))
|
|
ws.sleep_cycle()
|
|
elif isinstance(error, websocket.WebSocketException):
|
|
log.error('%s: got another websocket exception %s: %s',
|
|
ws.name, type(error), error)
|
|
# Status = Error
|
|
utils.settings(status, value=utils.lang(257))
|
|
ws.sleep_cycle()
|
|
elif isinstance(error, SystemExit):
|
|
log.debug('%s: SystemExit detected', ws.name)
|
|
# Status = Not connected
|
|
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208))
|
|
else:
|
|
log.exception('%s: got an unexpected exception of type %s: %s',
|
|
ws.name, type(error), error)
|
|
# Status = Error
|
|
utils.settings(status, value=utils.lang(257))
|
|
raise RuntimeError
|
|
|
|
|
|
def on_close(ws):
|
|
"""
|
|
This does not seem to get called by our websocket client :-(
|
|
"""
|
|
log.debug('%s: connection closed', ws.name)
|
|
# Status = Not connected
|
|
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(15208))
|
|
|
|
|
|
def on_open(ws):
|
|
log.debug('%s: connected', ws.name)
|
|
# Status = Connected
|
|
utils.settings(ws.name + SETTINGS_STRING, value=utils.lang(13296))
|
|
ws.sleeptime = 0
|
|
|
|
|
|
class PlexWebSocketApp(websocket.WebSocketApp,
|
|
backgroundthread.KillableThread):
|
|
def __init__(self, **kwargs):
|
|
self.sleeptime = 0
|
|
backgroundthread.KillableThread.__init__(self)
|
|
websocket.WebSocketApp.__init__(self, self.get_uri(), **kwargs)
|
|
|
|
def sleep_cycle(self):
|
|
"""
|
|
Sleeps for 2^self.sleeptime where sleeping period will be doubled with
|
|
each unsuccessful connection attempt.
|
|
Will sleep at most 64 seconds
|
|
"""
|
|
self.sleep(2 ** self.sleeptime)
|
|
if self.sleeptime < 6:
|
|
self.sleeptime += 1
|
|
|
|
def suspend(self, block=False, timeout=None):
|
|
"""
|
|
Call this method from another thread to suspend this websocket thread
|
|
"""
|
|
self.close()
|
|
backgroundthread.KillableThread.suspend(self, block, timeout)
|
|
|
|
def cancel(self):
|
|
"""
|
|
Call this method from another thread to cancel this websocket thread
|
|
"""
|
|
self.close()
|
|
backgroundthread.KillableThread.cancel(self)
|
|
|
|
def run(self):
|
|
"""
|
|
Ensure that sockets will be closed no matter what
|
|
"""
|
|
log.info("----===## Starting %s ##===----", self.name)
|
|
app.APP.register_thread(self)
|
|
try:
|
|
self._run()
|
|
except RuntimeError:
|
|
pass
|
|
except Exception as err:
|
|
log.exception('Exception of type %s occured: %s', type(err), err)
|
|
finally:
|
|
self.close()
|
|
# Status = Not connected
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(15208))
|
|
app.APP.deregister_thread(self)
|
|
log.info("----===## %s stopped ##===----", self.name)
|
|
|
|
def _run(self):
|
|
while not self.should_cancel():
|
|
# In the event the server goes offline
|
|
while self.should_suspend():
|
|
# We will be caught in this loop if either another thread
|
|
# called the suspend() method, thus setting _suspended = True
|
|
# OR if there any other conditions to not open a websocket
|
|
# connection - see methods should_suspend() below
|
|
# Status = Suspended - not connected
|
|
self.set_suspension_settings_status()
|
|
if self.wait_while_suspended():
|
|
# Abort was requested while waiting. We should exit
|
|
return
|
|
if not self._suspended:
|
|
# because wait_while_suspended will return instantly if
|
|
# this thread did not get suspended from another thread
|
|
self.sleep_cycle()
|
|
self.url = self.get_uri()
|
|
if not self.url:
|
|
self.sleep_cycle()
|
|
continue
|
|
self.run_forever()
|
|
|
|
|
|
class PMSWebsocketApp(PlexWebSocketApp):
|
|
name = 'pms_websocket'
|
|
|
|
def get_uri(self):
|
|
return get_pms_uri()
|
|
|
|
def should_suspend(self):
|
|
"""
|
|
Returns True if the thread needs to suspend.
|
|
"""
|
|
return (self._suspended or
|
|
utils.settings('enableBackgroundSync') != 'true')
|
|
|
|
def set_suspension_settings_status(self):
|
|
if utils.settings('enableBackgroundSync') != 'true':
|
|
# Status = Disabled
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(24023))
|
|
else:
|
|
# Status = 'Suspended - not connected'
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(39093))
|
|
|
|
|
|
class AlexaWebsocketApp(PlexWebSocketApp):
|
|
name = 'alexa_websocket'
|
|
|
|
def get_uri(self):
|
|
return get_alexa_uri()
|
|
|
|
def should_suspend(self):
|
|
"""
|
|
Returns True if the thread needs to suspend.
|
|
"""
|
|
return self._suspended or \
|
|
utils.settings('enable_alexa') != 'true' or \
|
|
app.ACCOUNT.restricted_user or \
|
|
not app.ACCOUNT.plex_token
|
|
|
|
def set_suspension_settings_status(self):
|
|
if utils.settings('enable_alexa') != 'true':
|
|
# Status = Disabled
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(24023))
|
|
elif app.ACCOUNT.restricted_user:
|
|
# Status = Managed Plex User - not connected
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(39094))
|
|
elif not app.ACCOUNT.plex_token:
|
|
# Status = Not logged in to plex.tv
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(39226))
|
|
else:
|
|
# Status = 'Suspended - not connected'
|
|
utils.settings(self.name + SETTINGS_STRING,
|
|
value=utils.lang(39093))
|
|
|
|
|
|
def get_pms_websocketapp():
|
|
return PMSWebsocketApp(on_open=on_open,
|
|
on_message=pms_on_message,
|
|
on_error=on_error,
|
|
on_close=on_close)
|
|
|
|
|
|
def get_alexa_websocketapp():
|
|
return AlexaWebsocketApp(on_open=on_open,
|
|
on_message=alexa_on_message,
|
|
on_error=on_error,
|
|
on_close=on_close)
|