PlexKodiConnect/resources/lib/plex_companion/polling.py

196 lines
7 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from logging import getLogger
import requests
from .processing import process_proxy_xml
from .common import proxy_headers, proxy_params, log_error
from .. import utils
from .. import backgroundthread
from .. import app
from .. import variables as v
# Disable annoying requests warnings
import requests.packages.urllib3
requests.packages.urllib3.disable_warnings()
# Timeout (connection timeout, read timeout)
# The later is up to 20 seconds, if the PMS has nothing to tell us
# THIS WILL PREVENT PKC FROM SHUTTING DOWN CORRECTLY
TIMEOUT = (5.0, 4.0)
# Max. timeout for the Listener: 2 ^ MAX_TIMEOUT
# Corresponds to 2 ^ 7 = 128 seconds
MAX_TIMEOUT = 7
log = getLogger('PLEX.companion.listener')
class Listener(backgroundthread.KillableThread):
"""
Opens a GET HTTP connection to the current PMS (that will time-out PMS-wise
after ~20 seconds) and listens for any commands by the PMS. Listening
will cause this PKC client to be registered as a Plex Companien client.
"""
daemon = True
def __init__(self, playstate_mgr):
self.s = None
self.playstate_mgr = playstate_mgr
self._sleep_timer = 0
super().__init__()
def _get_requests_session(self):
if self.s is None:
log.debug('Creating new requests session')
self.s = requests.Session()
self.s.headers = proxy_headers()
self.s.verify = app.CONN.verify_ssl_cert
if app.CONN.ssl_cert_path:
self.s.cert = app.CONN.ssl_cert_path
self.s.params = proxy_params()
return self.s
def close_requests_session(self):
try:
self.s.close()
except AttributeError:
# "thread-safety" - Just in case s was set to None in the
# meantime
pass
self.s = None
def _unauthorized(self):
"""Puts this thread to sleep until e.g. a PMS changes wakes it up"""
log.warn('We are not authorized to poll the PMS (http error 401). '
'Plex Companion will not work.')
self.suspend()
def _on_connection_error(self, req=None):
if req:
log_error(log.error, 'Error while contacting the PMS', req)
self.sleep(2 ^ self._sleep_timer)
if self._sleep_timer < MAX_TIMEOUT:
self._sleep_timer += 1
def ok_message(self, command_id):
url = f'{app.CONN.server}/player/proxy/response?commandID={command_id}'
try:
req = self.communicate(self.s.post,
url,
data=v.COMPANION_OK_MESSAGE.encode('utf-8'))
except (requests.RequestException, SystemExit):
return
if not req.ok:
log_error(log.error, 'Error replying OK', req)
@staticmethod
def communicate(method, url, **kwargs):
try:
req = method(url, **kwargs)
except requests.ConnectTimeout:
# The request timed out while trying to connect to the PMS
log.error('Requests ConnectionTimeout!')
raise
except requests.ReadTimeout:
# The PMS did not send any data in the allotted amount of time
log.error('Requests ReadTimeout!')
raise
except requests.TooManyRedirects:
log.error('TooManyRedirects error!')
raise
except requests.HTTPError as error:
log.error('HTTPError: %s', error)
raise
except requests.ConnectionError:
# Caused by PKC terminating the connection prematurely
# log.error('ConnectionError: %s', error)
raise
else:
req.encoding = 'utf-8'
# Access response content once in order to make sure to release the
# underlying sockets
req.content
return req
def run(self):
"""
Ensure that sockets will be closed no matter what
"""
app.APP.register_thread(self)
log.info("----===## Starting PollCompanion ##===----")
try:
self._run()
finally:
self.close_requests_session()
app.APP.deregister_thread(self)
log.info("----===## PollCompanion stopped ##===----")
def _run(self):
while not self.should_cancel():
if self.should_suspend():
self.close_requests_session()
if self.wait_while_suspended():
break
# See if there's anything we need to process
# timeout=1 will cause the PMS to "hold" the connection for approx
# 20 seconds. This will BLOCK requests - not something we can
# circumvent.
url = app.CONN.server + '/player/proxy/poll?timeout=1'
self._get_requests_session()
try:
req = self.communicate(self.s.get,
url,
timeout=TIMEOUT)
except requests.ConnectionError:
# No command received from the PMS - try again immediately
continue
except requests.RequestException:
self._on_connection_error()
continue
except SystemExit:
# We need to quit PKC entirely
break
# Sanity checks
if req.status_code == 401:
# We can't reach a PMS that is not in the local LAN
# This might even lead to e.g. cloudflare blocking us, thinking
# we're staging a DOS attach
self._unauthorized()
continue
elif not req.ok:
self._on_connection_error(req)
continue
elif not ('content-type' in req.headers
and 'xml' in req.headers['content-type']):
self._on_connection_error(req)
continue
if not req.text:
# Means the connection timed-out (usually after 20 seconds),
# because there was no command from the PMS or a client to
# remote-control anything no the PKC-side
# Received an empty body, but still header Content-Type: xml
continue
# Parsing
try:
xml = utils.etree.fromstring(req.content)
cmd = xml[0]
if len(xml) > 1:
# We should always just get ONE command per message
raise IndexError()
except (utils.ParseError, IndexError):
log.error('Could not parse the PMS xml:')
self._on_connection_error()
continue
# Do the work
log.debug('Received a Plex Companion command from the PMS:')
utils.log_xml(xml, log.debug)
self.playstate_mgr.check_subscriber(cmd)
if process_proxy_xml(cmd):
self.ok_message(cmd.get('commandID'))
self._sleep_timer = 0