Major Plex Companion overhaul, part 3

This commit is contained in:
croneter 2017-12-14 08:29:38 +01:00
parent 80c106d57f
commit c0e7c78a11
3 changed files with 168 additions and 166 deletions

View file

@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
"""
The Plex Companion master python file
"""
from logging import getLogger
from threading import Thread
from Queue import Queue, Empty
@ -18,7 +20,7 @@ import state
###############################################################################
log = getLogger("PLEX."+__name__)
LOG = getLogger("PLEX." + __name__)
###############################################################################
@ -26,39 +28,24 @@ log = getLogger("PLEX."+__name__)
@thread_methods(add_suspends=['PMS_STATUS'])
class PlexCompanion(Thread):
"""
Plex Companion monitoring class. Invoke only once
"""
def __init__(self, callback=None):
log.info("----===## Starting PlexCompanion ##===----")
LOG.info("----===## Starting PlexCompanion ##===----")
if callback is not None:
self.mgr = callback
# Start GDM for server/client discovery
self.client = plexgdm.plexgdm()
self.client.clientDetails()
log.debug("Registration string is:\n%s"
% self.client.getClientDetails())
LOG.debug("Registration string is:\n%s",
self.client.getClientDetails())
# kodi player instance
self.player = player.PKC_Player()
self.httpd = False
self.queue = None
Thread.__init__(self)
def _getStartItem(self, string):
"""
Grabs the Plex id from e.g. '/library/metadata/12987'
and returns the tuple (typus, id) where typus is either 'queueId' or
'plexId' and id is the corresponding id as a string
"""
typus = 'plexId'
if string.startswith('/library/metadata'):
try:
string = string.split('/')[3]
except IndexError:
string = ''
else:
log.error('Unknown string! %s' % string)
return typus, string
def processTasks(self, task):
def _process_tasks(self, task):
"""
Processes tasks picked up e.g. by Companion listener, e.g.
{'action': 'playlist',
@ -73,7 +60,7 @@ class PlexCompanion(Thread):
'token': 'transient-cd2527d1-0484-48e0-a5f7-f5caa7d591bd',
'type': 'video'}}
"""
log.debug('Processing: %s' % task)
LOG.debug('Processing: %s', task)
data = task['data']
# Get the token of the user flinging media (might be different one)
@ -84,11 +71,11 @@ class PlexCompanion(Thread):
try:
xml[0].attrib
except (AttributeError, IndexError, TypeError):
log.error('Could not download Plex metadata')
LOG.error('Could not download Plex metadata')
return
api = API(xml[0])
if api.getType() == v.PLEX_TYPE_ALBUM:
log.debug('Plex music album detected')
LOG.debug('Plex music album detected')
queue = self.mgr.playqueue.init_playqueue_from_plex_children(
api.getRatingKey())
queue.plex_transient_token = token
@ -120,11 +107,11 @@ class PlexCompanion(Thread):
elif task['action'] == 'playlist':
# Get the playqueue ID
try:
typus, ID, query = ParseContainerKey(data['containerKey'])
except Exception as e:
log.error('Exception while processing: %s' % e)
_, plex_id, query = ParseContainerKey(data['containerKey'])
except:
LOG.error('Exception while processing')
import traceback
log.error("Traceback:\n%s" % traceback.format_exc())
LOG.error("Traceback:\n%s", traceback.format_exc())
return
try:
playqueue = self.mgr.playqueue.get_playqueue_from_type(
@ -136,14 +123,14 @@ class PlexCompanion(Thread):
try:
xml[0].attrib
except (AttributeError, IndexError, TypeError):
log.error('Could not download Plex metadata')
LOG.error('Could not download Plex metadata')
return
api = API(xml[0])
playqueue = self.mgr.playqueue.get_playqueue_from_type(
v.KODI_PLAYLIST_TYPE_FROM_PLEX_TYPE[api.getType()])
self.mgr.playqueue.update_playqueue_from_PMS(
playqueue,
ID,
plex_id,
repeat=query.get('repeat'),
offset=data.get('offset'))
playqueue.plex_transient_token = token
@ -154,7 +141,7 @@ class PlexCompanion(Thread):
if xml is None:
return
if len(xml) == 0:
log.debug('Empty playqueue received - clearing playqueue')
LOG.debug('Empty playqueue received - clearing playqueue')
plex_type = get_plextype_from_xml(xml)
if plex_type is None:
return
@ -169,9 +156,11 @@ class PlexCompanion(Thread):
data['playQueueID'])
def run(self):
# Ensure that sockets will be closed no matter what
"""
Ensure that sockets will be closed no matter what
"""
try:
self.__run()
self._run()
finally:
try:
self.httpd.socket.shutdown(SHUT_RDWR)
@ -182,10 +171,9 @@ class PlexCompanion(Thread):
self.httpd.socket.close()
except AttributeError:
pass
log.info("----===## Plex Companion stopped ##===----")
LOG.info("----===## Plex Companion stopped ##===----")
def __run(self):
self.httpd = False
def _run(self):
httpd = self.httpd
# Cache for quicker while loops
client = self.client
@ -193,10 +181,9 @@ class PlexCompanion(Thread):
thread_suspended = self.thread_suspended
# Start up instances
requestMgr = httppersist.RequestMgr()
subscriptionManager = subscribers.SubscriptionManager(
requestMgr, self.player, self.mgr)
request_mgr = httppersist.RequestMgr()
subscription_manager = subscribers.SubscriptionMgr(
request_mgr, self.player, self.mgr)
queue = Queue(maxsize=100)
self.queue = queue
@ -207,33 +194,28 @@ class PlexCompanion(Thread):
try:
httpd = listener.ThreadedHTTPServer(
client,
subscriptionManager,
subscription_manager,
queue,
('', v.COMPANION_PORT),
listener.MyHandler)
httpd.timeout = 0.95
break
except:
log.error("Unable to start PlexCompanion. Traceback:")
LOG.error("Unable to start PlexCompanion. Traceback:")
import traceback
log.error(traceback.print_exc())
LOG.error(traceback.print_exc())
sleep(3000)
if start_count == 3:
log.error("Error: Unable to start web helper.")
LOG.error("Error: Unable to start web helper.")
httpd = False
break
start_count += 1
else:
log.info('User deactivated Plex Companion')
LOG.info('User deactivated Plex Companion')
client.start_all()
message_count = 0
if httpd:
t = Thread(target=httpd.handle_request)
thread = Thread(target=httpd.handle_request)
while not thread_stopped():
# If we are not authorized, sleep
@ -246,30 +228,30 @@ class PlexCompanion(Thread):
try:
message_count += 1
if httpd:
if not t.isAlive():
if not thread.isAlive():
# Use threads cause the method will stall
t = Thread(target=httpd.handle_request)
t.start()
thread = Thread(target=httpd.handle_request)
thread.start()
if message_count == 3000:
message_count = 0
if client.check_client_registration():
log.debug("Client is still registered")
LOG.debug('Client is still registered')
else:
log.debug("Client is no longer registered. "
"Plex Companion still running on port %s"
% v.COMPANION_PORT)
LOG.debug('Client is no longer registered. Plex '
'Companion still running on port %s',
v.COMPANION_PORT)
client.register_as_client()
# Get and set servers
if message_count % 30 == 0:
subscriptionManager.serverlist = client.getServerList()
subscriptionManager.notify()
subscription_manager.serverlist = client.getServerList()
subscription_manager.notify()
if not httpd:
message_count = 0
except:
log.warn("Error in loop, continuing anyway. Traceback:")
LOG.warn("Error in loop, continuing anyway. Traceback:")
import traceback
log.warn(traceback.format_exc())
LOG.warn(traceback.format_exc())
# See if there's anything we need to process
try:
task = queue.get(block=False)
@ -277,10 +259,9 @@ class PlexCompanion(Thread):
pass
else:
# Got instructions, process them
self.processTasks(task)
self._process_tasks(task)
queue.task_done()
# Don't sleep
continue
sleep(50)
client.stop_all()

View file

@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
import logging
"""
Plex Companion listener
"""
from logging import getLogger
from re import sub
from SocketServer import ThreadingMixIn
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
@ -7,40 +9,33 @@ from urlparse import urlparse, parse_qs
from xbmc import sleep
from companion import process_command
from utils import window
import json_rpc as js
from clientinfo import getXArgsDeviceInfo
import variables as v
###############################################################################
log = logging.getLogger("PLEX."+__name__)
LOG = getLogger("PLEX." + __name__)
###############################################################################
class MyHandler(BaseHTTPRequestHandler):
"""
BaseHTTPRequestHandler implementation of Plex Companion listener
"""
protocol_version = 'HTTP/1.1'
def __init__(self, *args, **kwargs):
BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
self.serverlist = []
def getServerByHost(self, host):
if len(self.serverlist) == 1:
return self.serverlist[0]
for server in self.serverlist:
if (server.get('serverName') in host or
server.get('server') in host):
return server
return {}
def do_HEAD(self):
log.debug("Serving HEAD request...")
LOG.debug("Serving HEAD request...")
self.answer_request(0)
def do_GET(self):
log.debug("Serving GET request...")
LOG.debug("Serving GET request...")
self.answer_request(1)
def do_OPTIONS(self):
@ -65,7 +60,8 @@ class MyHandler(BaseHTTPRequestHandler):
def sendOK(self):
self.send_response(200)
def response(self, body, headers={}, code=200):
def response(self, body, headers=None, code=200):
headers = {} if headers is None else headers
try:
self.send_response(code)
for key in headers:
@ -78,9 +74,9 @@ class MyHandler(BaseHTTPRequestHandler):
except:
pass
def answer_request(self, sendData):
def answer_request(self, send_data):
self.serverlist = self.server.client.getServerList()
subMgr = self.server.subscriptionManager
sub_mgr = self.server.subscription_manager
try:
request_path = self.path[1:]
@ -90,9 +86,9 @@ class MyHandler(BaseHTTPRequestHandler):
params = {}
for key in paramarrays:
params[key] = paramarrays[key][0]
log.debug("remote request_path: %s" % request_path)
log.debug("params received from remote: %s" % params)
subMgr.updateCommandID(self.headers.get(
LOG.debug("remote request_path: %s", request_path)
LOG.debug("params received from remote: %s", params)
sub_mgr.update_command_id(self.headers.get(
'X-Plex-Client-Identifier',
self.client_address[0]),
params.get('commandID', False))
@ -123,7 +119,7 @@ class MyHandler(BaseHTTPRequestHandler):
v.PKC_MACHINE_IDENTIFIER,
v.PLATFORM,
v.ADDON_VERSION))
log.debug("crafted resources response: %s" % resp)
LOG.debug("crafted resources response: %s", resp)
self.response(resp, getXArgsDeviceInfo(include_token=False))
elif "/subscribe" in request_path:
self.response(v.COMPANION_OK_MESSAGE,
@ -132,20 +128,20 @@ class MyHandler(BaseHTTPRequestHandler):
host = self.client_address[0]
port = params.get('port', False)
uuid = self.headers.get('X-Plex-Client-Identifier', "")
commandID = params.get('commandID', 0)
subMgr.addSubscriber(protocol,
host,
port,
uuid,
commandID)
command_id = params.get('commandID', 0)
sub_mgr.add_subscriber(protocol,
host,
port,
uuid,
command_id)
elif "/poll" in request_path:
if params.get('wait', False) == '1':
sleep(950)
commandID = params.get('commandID', 0)
command_id = params.get('commandID', 0)
self.response(
sub(r"INSERTCOMMANDID",
str(commandID),
subMgr.msg(js.get_players())),
str(command_id),
sub_mgr.msg(js.get_players())),
{
'X-Plex-Client-Identifier': v.PKC_MACHINE_IDENTIFIER,
'X-Plex-Protocol': '1.0',
@ -160,29 +156,32 @@ class MyHandler(BaseHTTPRequestHandler):
getXArgsDeviceInfo(include_token=False))
uuid = self.headers.get('X-Plex-Client-Identifier', False) \
or self.client_address[0]
subMgr.removeSubscriber(uuid)
sub_mgr.remove_subscriber(uuid)
else:
# Throw it to companion.py
process_command(request_path, params, self.server.queue)
self.response('', getXArgsDeviceInfo(include_token=False))
subMgr.notify()
sub_mgr.notify()
except:
log.error('Error encountered. Traceback:')
LOG.error('Error encountered. Traceback:')
import traceback
log.error(traceback.print_exc())
LOG.error(traceback.print_exc())
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""
Using ThreadingMixIn Thread magic
"""
daemon_threads = True
def __init__(self, client, subscriptionManager, queue, *args, **kwargs):
def __init__(self, client, subscription_manager, queue, *args, **kwargs):
"""
client: Class handle to plexgdm.plexgdm. We can thus ask for an up-to-
date serverlist without instantiating anything
same for SubscriptionManager
same for SubscriptionMgr
"""
self.client = client
self.subscriptionManager = subscriptionManager
self.subscription_manager = subscription_manager
self.queue = queue
HTTPServer.__init__(self, *args, **kwargs)

View file

@ -6,7 +6,7 @@ from logging import getLogger
from re import sub
from threading import Thread, RLock
import downloadutils
from downloadutils import DownloadUtils as DU
from utils import window, kodi_time_to_millis
import state
import variables as v
@ -22,20 +22,22 @@ LOG = getLogger("PLEX." + __name__)
CONTROLLABLE = {
v.PLEX_TYPE_PHOTO: 'skipPrevious,skipNext,stop',
v.PLEX_TYPE_AUDIO: 'playPause,stop,volume,shuffle,repeat,seekTo,'
'skipPrevious,skipNext,stepBack,stepForward',
'skipPrevious,skipNext,stepBack,stepForward',
v.PLEX_TYPE_VIDEO: 'playPause,stop,volume,shuffle,audioStream,'
'subtitleStream,seekTo,skipPrevious,skipNext,stepBack,stepForward'
'subtitleStream,seekTo,skipPrevious,skipNext,'
'stepBack,stepForward'
}
class SubscriptionManager:
class SubscriptionMgr(object):
"""
Manages Plex companion subscriptions
"""
def __init__(self, RequestMgr, player, mgr):
def __init__(self, request_mgr, player, mgr):
self.serverlist = []
self.subscribers = {}
self.info = {}
self.containerKey = None
self.container_key = None
self.ratingkey = None
self.server = ""
self.protocol = "http"
@ -44,10 +46,9 @@ class SubscriptionManager:
self.last_params = {}
self.lastplayers = {}
self.doUtils = downloadutils.DownloadUtils
self.xbmcplayer = player
self.playqueue = mgr.playqueue
self.RequestMgr = RequestMgr
self.request_mgr = request_mgr
@staticmethod
def _headers():
@ -63,7 +64,7 @@ class SubscriptionManager:
'X-Plex-Protocol': "1.0"
}
def getServerByHost(self, host):
def _server_by_host(self, host):
if len(self.serverlist) == 1:
return self.serverlist[0]
for server in self.serverlist:
@ -72,17 +73,17 @@ class SubscriptionManager:
return server
return {}
def msg(self, players):
def _msg(self, players):
LOG.debug('players: %s', players)
msg = v.XML_HEADER
msg += '<MediaContainer size="3" commandID="INSERTCOMMANDID"'
msg += ' machineIdentifier="%s">\n' % v.PKC_MACHINE_IDENTIFIER
msg += self.get_timeline_xml(players.get(v.KODI_TYPE_AUDIO),
v.PLEX_TYPE_AUDIO)
msg += self.get_timeline_xml(players.get(v.KODI_TYPE_PHOTO),
v.PLEX_TYPE_PHOTO)
msg += self.get_timeline_xml(players.get(v.KODI_TYPE_VIDEO),
v.PLEX_TYPE_VIDEO)
msg += self._timeline_xml(players.get(v.KODI_TYPE_AUDIO),
v.PLEX_TYPE_AUDIO)
msg += self._timeline_xml(players.get(v.KODI_TYPE_PHOTO),
v.PLEX_TYPE_PHOTO)
msg += self._timeline_xml(players.get(v.KODI_TYPE_VIDEO),
v.PLEX_TYPE_VIDEO)
msg += "</MediaContainer>"
LOG.debug('msg is: %s', msg)
return msg
@ -104,7 +105,7 @@ class SubscriptionManager:
state.PLAYER_STATES[playerid]['plex_id']
return key
def get_timeline_xml(self, player, ptype):
def _timeline_xml(self, player, ptype):
if player is None:
return ' <Timeline state="stopped" controllable="%s" type="%s" ' \
'itemType="%s" />\n' % (CONTROLLABLE[ptype], ptype, ptype)
@ -124,7 +125,7 @@ class SubscriptionManager:
muted = '1' if info['muted'] is True else '0'
ret += ' mute="%s"' % muted
pbmc_server = window('pms_server')
server = self.getServerByHost(self.server)
server = self._server_by_host(self.server)
if pbmc_server:
(self.protocol, self.server, self.port) = pbmc_server.split(':')
self.server = self.server.replace('/', '')
@ -136,16 +137,16 @@ class SubscriptionManager:
playqueue = self.playqueue.playqueues[playerid]
key = self._get_container_key(playerid)
if key is not None and key.startswith('/playQueues'):
self.containerKey = key
ret += ' containerKey="%s"' % self.containerKey
self.container_key = key
ret += ' containerKey="%s"' % self.container_key
pos = info['position']
ret += ' playQueueItemID="%s"' % playqueue.items[pos].id or 'null'
ret += ' playQueueID="%s"' % playqueue.id or 'null'
ret += ' playQueueVersion="%s"' % playqueue.version or 'null'
ret += ' guid="%s"' % playqueue.items[pos].guid or 'null'
elif key:
self.containerKey = key
ret += ' containerKey="%s"' % self.containerKey
self.container_key = key
ret += ' containerKey="%s"' % self.container_key
ret += ' machineIdentifier="%s"' % server.get('uuid', "")
ret += ' protocol="%s"' % server.get('protocol', 'http')
ret += ' address="%s"' % server.get('server', self.server)
@ -162,26 +163,34 @@ class SubscriptionManager:
ret += '/>\n'
return ret
def updateCommandID(self, uuid, commandID):
if commandID and self.subscribers.get(uuid, False):
self.subscribers[uuid].commandID = int(commandID)
def update_command_id(self, uuid, command_id):
"""
Updates the Plex Companien client with the machine identifier uuid with
command_id
"""
if command_id and self.subscribers.get(uuid):
self.subscribers[uuid].command_id = int(command_id)
def notify(self, event=False):
self.cleanup()
def notify(self):
"""
Causes PKC to tell the PMS and Plex Companion players to receive a
notification what's being played.
"""
self._cleanup()
# Do we need a check to NOT tell about e.g. PVR/TV and Addon playback?
players = js.get_players()
# fetch the message, subscribers or not, since the server
# will need the info anyway
msg = self.msg(players)
msg = self._msg(players)
if self.subscribers:
with RLock():
for subscriber in self.subscribers.values():
subscriber.send_update(msg, len(players) == 0)
self.notifyServer(players)
subscriber.send_update(msg, not players)
self._notify_server(players)
self.lastplayers = players
return True
def notifyServer(self, players):
def _notify_server(self, players):
for typus, player in players.iteritems():
self._send_pms_notification(
player['playerid'], self._get_pms_params(player['playerid']))
@ -204,10 +213,10 @@ class SubscriptionManager:
'time': kodi_time_to_millis(info['time']),
'duration': kodi_time_to_millis(info['totaltime'])
}
if self.containerKey:
params['containerKey'] = self.containerKey
if self.containerKey is not None and \
self.containerKey.startswith('/playQueues/'):
if self.container_key:
params['containerKey'] = self.container_key
if self.container_key is not None and \
self.container_key.startswith('/playQueues/'):
playqueue = self.playqueue.playqueues[playerid]
params['playQueueVersion'] = playqueue.version
params['playQueueItemID'] = playqueue.id
@ -215,7 +224,7 @@ class SubscriptionManager:
return params
def _send_pms_notification(self, playerid, params):
serv = self.getServerByHost(self.server)
serv = self._server_by_host(self.server)
xargs = self._headers()
playqueue = self.playqueue.playqueues[playerid]
if state.PLEX_TRANSIENT_TOKEN:
@ -225,32 +234,39 @@ class SubscriptionManager:
url = '%s://%s:%s/:/timeline' % (serv.get('protocol', 'http'),
serv.get('server', 'localhost'),
serv.get('port', '32400'))
self.doUtils().downloadUrl(
url, parameters=params, headerOptions=xargs)
DU().downloadUrl(url, parameters=params, headerOptions=xargs)
# Save to be able to signal a stop at the end
LOG.debug("Sent server notification with parameters: %s to %s",
params, url)
def addSubscriber(self, protocol, host, port, uuid, commandID):
def add_subscriber(self, protocol, host, port, uuid, command_id):
"""
Adds a new Plex Companion subscriber to PKC.
"""
subscriber = Subscriber(protocol,
host,
port,
uuid,
commandID,
command_id,
self,
self.RequestMgr)
self.request_mgr)
with RLock():
self.subscribers[subscriber.uuid] = subscriber
return subscriber
def removeSubscriber(self, uuid):
def remove_subscriber(self, uuid):
"""
Removes a connected Plex Companion subscriber with machine identifier
uuid from PKC notifications.
(Calls the cleanup() method of the subscriber)
"""
with RLock():
for subscriber in self.subscribers.values():
if subscriber.uuid == uuid or subscriber.host == uuid:
subscriber.cleanup()
del self.subscribers[subscriber.uuid]
def cleanup(self):
def _cleanup(self):
with RLock():
for subscriber in self.subscribers.values():
if subscriber.age > 30:
@ -258,27 +274,35 @@ class SubscriptionManager:
del self.subscribers[subscriber.uuid]
class Subscriber:
def __init__(self, protocol, host, port, uuid, commandID,
subMgr, RequestMgr):
class Subscriber(object):
"""
Plex Companion subscribing device
"""
def __init__(self, protocol, host, port, uuid, command_id, sub_mgr,
request_mgr):
self.protocol = protocol or "http"
self.host = host
self.port = port or 32400
self.uuid = uuid or host
self.commandID = int(commandID) or 0
self.command_id = int(command_id) or 0
self.navlocationsent = False
self.age = 0
self.doUtils = downloadutils.DownloadUtils
self.subMgr = subMgr
self.RequestMgr = RequestMgr
self.sub_mgr = sub_mgr
self.request_mgr = request_mgr
def __eq__(self, other):
return self.uuid == other.uuid
def cleanup(self):
self.RequestMgr.closeConnection(self.protocol, self.host, self.port)
"""
Closes the connection to the Plex Companion client
"""
self.request_mgr.closeConnection(self.protocol, self.host, self.port)
def send_update(self, msg, is_nav):
"""
Sends msg to the Plex Companion client (via .../:/timeline)
"""
self.age += 1
if not is_nav:
self.navlocationsent = False
@ -286,21 +310,19 @@ class Subscriber:
return True
else:
self.navlocationsent = True
msg = sub(r"INSERTCOMMANDID", str(self.commandID), msg)
msg = sub(r"INSERTCOMMANDID", str(self.command_id), msg)
LOG.debug("sending xml to subscriber uuid=%s,commandID=%i:\n%s",
self.uuid, self.commandID, msg)
self.uuid, self.command_id, msg)
url = self.protocol + '://' + self.host + ':' + self.port \
+ "/:/timeline"
t = Thread(target=self.threadedSend, args=(url, msg))
t.start()
thread = Thread(target=self._threaded_send, args=(url, msg))
thread.start()
def threadedSend(self, url, msg):
def _threaded_send(self, url, msg):
"""
Threaded POST request, because they stall due to PMS response missing
the Content-Length header :-(
"""
response = self.doUtils().downloadUrl(url,
postBody=msg,
action_type="POST")
if response in [False, None, 401]:
self.subMgr.removeSubscriber(self.uuid)
response = DU().downloadUrl(url, postBody=msg, action_type="POST")
if response in (False, None, 401):
self.sub_mgr.remove_subscriber(self.uuid)