From 274ed4b43086a5f9d7e2c72998086e2a22facc2e Mon Sep 17 00:00:00 2001 From: croneter Date: Fri, 8 Sep 2017 12:06:31 +0200 Subject: [PATCH] Background sync now picks up more PMS changes --- resources/lib/librarysync.py | 278 ++++++++++++++++++------------ resources/lib/websocket_client.py | 33 ++-- 2 files changed, 182 insertions(+), 129 deletions(-) diff --git a/resources/lib/librarysync.py b/resources/lib/librarysync.py index 205c7e32..a9152cc0 100644 --- a/resources/lib/librarysync.py +++ b/resources/lib/librarysync.py @@ -21,7 +21,8 @@ import videonodes import variables as v from PlexFunctions import GetPlexMetadata, GetAllPlexLeaves, scrobble, \ - GetPlexSectionResults, GetAllPlexChildren, GetPMSStatus, get_plex_sections + GetPlexSectionResults, GetPlexKeyNumber, GetPMSStatus, get_plex_sections, \ + GetAllPlexChildren import PlexAPI from library_sync.get_metadata import Threaded_Get_Metadata from library_sync.process_metadata import Threaded_Process_Metadata @@ -1075,11 +1076,24 @@ class LibrarySync(Thread): processes json.loads() messages from websocket. Triage what we need to do with "process_" methods """ - typus = message.get('type') - if typus == 'playing': - self.process_playing(message['PlaySessionStateNotification']) - elif typus == 'timeline': - self.process_timeline(message['TimelineEntry']) + if message['type'] == 'playing': + try: + self.process_playing(message['PlaySessionStateNotification']) + except KeyError: + log.error('Received invalid PMS message for playstate: %s' + % message) + elif message['type'] == 'timeline': + try: + self.process_timeline(message['TimelineEntry']) + except (KeyError, ValueError): + log.error('Received invalid PMS message for timeline: %s' + % message) + elif message['type'] == 'activity': + try: + self.process_activity(message['ActivityNotification']) + except KeyError: + log.error('Received invalid PMS message for activity: %s' + % message) def multi_delete(self, liste, deleteListe): """ @@ -1134,11 +1148,10 @@ class LibrarySync(Thread): else: successful = self.process_newitems(item) if successful and settings('FanartTV') == 'true': - plex_type = v.PLEX_TYPE_FROM_WEBSOCKET[item['type']] - if plex_type in (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_SHOW): + if item['type'] in (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_SHOW): self.fanartqueue.put({ 'plex_id': item['ratingKey'], - 'plex_type': plex_type, + 'plex_type': item['type'], 'refresh': False }) if successful is True: @@ -1194,22 +1207,25 @@ class LibrarySync(Thread): return True def process_deleteditems(self, item): - if item.get('type') == 1: - log.debug("Removing movie %s" % item.get('ratingKey')) + if item['type'] == v.PLEX_TYPE_MOVIE: + log.debug("Removing movie %s" % item['ratingKey']) self.videoLibUpdate = True with itemtypes.Movies() as movie: - movie.remove(item.get('ratingKey')) - elif item.get('type') in (2, 3, 4): - log.debug("Removing episode/season/tv show %s" - % item.get('ratingKey')) + movie.remove(item['ratingKey']) + elif item['type'] in (v.PLEX_TYPE_SHOW, + v.PLEX_TYPE_SEASON, + v.PLEX_TYPE_EPISODE): + log.debug("Removing episode/season/tv show %s" % item['ratingKey']) self.videoLibUpdate = True with itemtypes.TVShows() as show: - show.remove(item.get('ratingKey')) - elif item.get('type') in (8, 9, 10): - log.debug("Removing song/album/artist %s" % item.get('ratingKey')) + show.remove(item['ratingKey']) + elif item['type'] in (v.PLEX_TYPE_ARTIST, + v.PLEX_TYPE_ALBUM, + v.PLEX_TYPE_SONG): + log.debug("Removing song/album/artist %s" % item['ratingKey']) self.musicLibUpdate = True with itemtypes.Music() as music: - music.remove(item.get('ratingKey')) + music.remove(item['ratingKey']) return True def process_timeline(self, data): @@ -1223,14 +1239,13 @@ class LibrarySync(Thread): # (DVR ratingKeys are not unique and might correspond to a # movie or episode) continue - typus = int(item.get('type', 0)) - status = int(item.get('state', 0)) - if status == 9 or (typus in (1, 4, 10) and status == 5): + typus = v.PLEX_TYPE_FROM_WEBSOCKET[int(item['type'])] + status = int(item['state']) + if status == 9 or (typus in (v.PLEX_TYPE_MOVIE, + v.PLEX_TYPE_EPISODE, + v.PLEX_TYPE_SONG) and status == 5): # Only process deleted items OR movies, episodes, tracks/songs - plex_id = str(item.get('itemID', '0')) - if plex_id == '0': - log.error('Received malformed PMS message: %s' % item) - continue + plex_id = str(item['itemID']) # Have we already added this element? for existingItem in self.itemsToProcess: if existingItem['ratingKey'] == plex_id: @@ -1245,101 +1260,136 @@ class LibrarySync(Thread): 'attempt': 0 }) + def process_activity(self, data): + """ + PMS is re-scanning an item, e.g. after having changed a movie poster. + WATCH OUT for this if it's triggered by our PKC library scan! + """ + for item in data: + if item['event'] != 'ended': + # Scan still going on, so skip for now + continue + elif item['Activity']['type'] != 'library.refresh.items': + # Not the type of message relevant for us + continue + plex_id = GetPlexKeyNumber(item['Activity']['Context']['key'])[1] + if plex_id == '': + raise KeyError('Could not extract the Plex id') + # We're only looking at existing elements - have we synced yet? + with plexdb.Get_Plex_DB() as plex_db: + kodi_info = plex_db.getItem_byId(plex_id) + if kodi_info is None: + log.debug('Plex id %s not synced yet - skipping' % plex_id) + continue + # Have we already added this element? + for existingItem in self.itemsToProcess: + if existingItem['ratingKey'] == plex_id: + break + else: + # Haven't added this element to the queue yet + self.itemsToProcess.append({ + 'state': None, # Don't need a state here + 'type': kodi_info[5], + 'ratingKey': plex_id, + 'timestamp': getUnixTimestamp(), + 'attempt': 0 + }) + def process_playing(self, data): """ Someone (not necessarily the user signed in) is playing something some- where """ items = [] - with plexdb.Get_Plex_DB() as plex_db: - for item in data: - # Drop buffering messages immediately - status = item.get('state') - if status == 'buffering': - continue - ratingKey = item.get('ratingKey') - kodiInfo = plex_db.getItem_byId(ratingKey) - if kodiInfo is None: - # Item not (yet) in Kodi library - continue - sessionKey = item.get('sessionKey') - # Do we already have a sessionKey stored? - if sessionKey not in self.sessionKeys: - if settings('plex_serverowned') == 'false': - # Not our PMS, we are not authorized to get the - # sessions - # On the bright side, it must be us playing :-) - self.sessionKeys = { - sessionKey: {} - } - else: - # PMS is ours - get all current sessions - self.sessionKeys = GetPMSStatus(state.PLEX_TOKEN) - log.debug('Updated current sessions. They are: %s' - % self.sessionKeys) - if sessionKey not in self.sessionKeys: - log.warn('Session key %s still unknown! Skip ' - 'item' % sessionKey) - continue - - currSess = self.sessionKeys[sessionKey] - if settings('plex_serverowned') != 'false': - # Identify the user - same one as signed on with PKC? Skip - # update if neither session's username nor userid match - # (Owner sometime's returns id '1', not always) - if (not state.PLEX_TOKEN and currSess['userId'] == '1'): - # PKC not signed in to plex.tv. Plus owner of PMS is - # playing (the '1'). - # Hence must be us (since several users require plex.tv - # token for PKC) - pass - elif not (currSess['userId'] == state.PLEX_USER_ID - or - currSess['username'] == state.PLEX_USERNAME): - log.debug('Our username %s, userid %s did not match ' - 'the session username %s with userid %s' - % (state.PLEX_USERNAME, - state.PLEX_USER_ID, - currSess['username'], - currSess['userId'])) - continue - - # Get an up-to-date XML from the PMS - # because PMS will NOT directly tell us: - # duration of item - # viewCount - if currSess.get('duration') is None: - xml = GetPlexMetadata(ratingKey) - if xml in (None, 401): - log.error('Could not get up-to-date xml for item %s' - % ratingKey) - continue - API = PlexAPI.API(xml[0]) - userdata = API.getUserData() - currSess['duration'] = userdata['Runtime'] - currSess['viewCount'] = userdata['PlayCount'] - # Sometimes, Plex tells us resume points in milliseconds and - # not in seconds - thank you very much! - if item.get('viewOffset') > currSess['duration']: - resume = item.get('viewOffset') / 1000 + for item in data: + # Drop buffering messages immediately + status = item['state'] + if status == 'buffering': + continue + ratingKey = str(item['ratingKey']) + with plexdb.Get_Plex_DB() as plex_db: + kodi_info = plex_db.getItem_byId(ratingKey) + if kodi_info is None: + # Item not (yet) in Kodi library + continue + sessionKey = item['sessionKey'] + # Do we already have a sessionKey stored? + if sessionKey not in self.sessionKeys: + if settings('plex_serverowned') == 'false': + # Not our PMS, we are not authorized to get the + # sessions + # On the bright side, it must be us playing :-) + self.sessionKeys = { + sessionKey: {} + } else: - resume = item.get('viewOffset') - # Append to list that we need to process - items.append({ - 'ratingKey': ratingKey, - 'kodi_id': kodiInfo[0], - 'file_id': kodiInfo[1], - 'kodi_type': kodiInfo[4], - 'viewOffset': resume, - 'state': status, - 'duration': currSess['duration'], - 'viewCount': currSess['viewCount'], - 'lastViewedAt': DateToKodi(getUnixTimestamp()) - }) - log.debug('Update playstate for user %s with id %s: %s' - % (state.PLEX_USERNAME, - state.PLEX_USER_ID, - items[-1])) + # PMS is ours - get all current sessions + self.sessionKeys = GetPMSStatus(state.PLEX_TOKEN) + log.debug('Updated current sessions. They are: %s' + % self.sessionKeys) + if sessionKey not in self.sessionKeys: + log.warn('Session key %s still unknown! Skip ' + 'item' % sessionKey) + continue + + currSess = self.sessionKeys[sessionKey] + if settings('plex_serverowned') != 'false': + # Identify the user - same one as signed on with PKC? Skip + # update if neither session's username nor userid match + # (Owner sometime's returns id '1', not always) + if (not state.PLEX_TOKEN and currSess['userId'] == '1'): + # PKC not signed in to plex.tv. Plus owner of PMS is + # playing (the '1'). + # Hence must be us (since several users require plex.tv + # token for PKC) + pass + elif not (currSess['userId'] == state.PLEX_USER_ID + or + currSess['username'] == state.PLEX_USERNAME): + log.debug('Our username %s, userid %s did not match ' + 'the session username %s with userid %s' + % (state.PLEX_USERNAME, + state.PLEX_USER_ID, + currSess['username'], + currSess['userId'])) + continue + + # Get an up-to-date XML from the PMS + # because PMS will NOT directly tell us: + # duration of item + # viewCount + if currSess.get('duration') is None: + xml = GetPlexMetadata(ratingKey) + if xml in (None, 401): + log.error('Could not get up-to-date xml for item %s' + % ratingKey) + continue + API = PlexAPI.API(xml[0]) + userdata = API.getUserData() + currSess['duration'] = userdata['Runtime'] + currSess['viewCount'] = userdata['PlayCount'] + # Sometimes, Plex tells us resume points in milliseconds and + # not in seconds - thank you very much! + if item.get('viewOffset') > currSess['duration']: + resume = item.get('viewOffset') / 1000 + else: + resume = item.get('viewOffset') + # Append to list that we need to process + items.append({ + 'ratingKey': ratingKey, + 'kodi_id': kodi_info[0], + 'file_id': kodi_info[1], + 'kodi_type': kodi_info[4], + 'viewOffset': resume, + 'state': status, + 'duration': currSess['duration'], + 'viewCount': currSess['viewCount'], + 'lastViewedAt': DateToKodi(getUnixTimestamp()) + }) + log.debug('Update playstate for user %s with id %s: %s' + % (state.PLEX_USERNAME, + state.PLEX_USER_ID, + items[-1])) # Now tell Kodi where we are for item in items: itemFkt = getattr(itemtypes, diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index ccebfb7b..ba3f97e1 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -186,7 +186,7 @@ class PMS_Websocket(WebSocket): def process(self, opcode, message): if opcode not in self.opcode_data: - return False + return try: message = loads(message) @@ -194,28 +194,32 @@ class PMS_Websocket(WebSocket): log.error('%s: Error decoding message from websocket' % self.__class__.__name__) log.error(message) - return False + return try: message = message['NotificationContainer'] except KeyError: log.error('%s: Could not parse PMS message: %s' % (self.__class__.__name__, message)) - return False + return # Triage typus = message.get('type') if typus is None: log.error('%s: No message type, dropping message: %s' % (self.__class__.__name__, message)) - return False + return log.debug('%s: Received message from PMS server: %s' % (self.__class__.__name__, message)) # Drop everything we're not interested in - if typus not in ('playing', 'timeline'): - return True - - # Put PMS message on queue and let libsync take care of it - self.queue.put(message) - return True + if typus not in ('playing', 'timeline', 'activity'): + return + elif typus == 'activity' and state.DB_SCAN is True: + # Only add to processing if PKC is NOT doing a lib scan (and thus + # possibly causing these reprocessing messages en mass) + log.debug('%s: Dropping message as PKC is currently synching' + % self.__class__.__name__) + else: + # Put PMS message on queue and let libsync take care of it + self.queue.put(message) def IOError_response(self): log.warn("Repeatedly could not connect to PMS, " @@ -244,7 +248,7 @@ class Alexa_Websocket(WebSocket): def process(self, opcode, message): if opcode not in self.opcode_data: - return False + return log.debug('%s: Received the following message from Alexa:' % self.__class__.__name__) log.debug('%s: %s' % (self.__class__.__name__, message)) @@ -253,22 +257,21 @@ class Alexa_Websocket(WebSocket): except Exception as ex: log.error('%s: Error decoding message from Alexa: %s' % (self.__class__.__name__, ex)) - return False + return try: if message.attrib['command'] == 'processRemoteControlCommand': message = message[0] else: log.error('%s: Unknown Alexa message received' % self.__class__.__name__) - return False + return except: log.error('%s: Could not parse Alexa message' % self.__class__.__name__) - return False + return process_command(message.attrib['path'][1:], message.attrib, queue=self.mgr.plexCompanion.queue) - return True def IOError_response(self): pass