Background sync now picks up more PMS changes
This commit is contained in:
parent
5fcccba105
commit
274ed4b430
2 changed files with 182 additions and 129 deletions
|
@ -21,7 +21,8 @@ import videonodes
|
||||||
import variables as v
|
import variables as v
|
||||||
|
|
||||||
from PlexFunctions import GetPlexMetadata, GetAllPlexLeaves, scrobble, \
|
from PlexFunctions import GetPlexMetadata, GetAllPlexLeaves, scrobble, \
|
||||||
GetPlexSectionResults, GetAllPlexChildren, GetPMSStatus, get_plex_sections
|
GetPlexSectionResults, GetPlexKeyNumber, GetPMSStatus, get_plex_sections, \
|
||||||
|
GetAllPlexChildren
|
||||||
import PlexAPI
|
import PlexAPI
|
||||||
from library_sync.get_metadata import Threaded_Get_Metadata
|
from library_sync.get_metadata import Threaded_Get_Metadata
|
||||||
from library_sync.process_metadata import Threaded_Process_Metadata
|
from library_sync.process_metadata import Threaded_Process_Metadata
|
||||||
|
@ -1075,11 +1076,24 @@ class LibrarySync(Thread):
|
||||||
processes json.loads() messages from websocket. Triage what we need to
|
processes json.loads() messages from websocket. Triage what we need to
|
||||||
do with "process_" methods
|
do with "process_" methods
|
||||||
"""
|
"""
|
||||||
typus = message.get('type')
|
if message['type'] == 'playing':
|
||||||
if typus == 'playing':
|
try:
|
||||||
self.process_playing(message['PlaySessionStateNotification'])
|
self.process_playing(message['PlaySessionStateNotification'])
|
||||||
elif typus == 'timeline':
|
except KeyError:
|
||||||
self.process_timeline(message['TimelineEntry'])
|
log.error('Received invalid PMS message for playstate: %s'
|
||||||
|
% message)
|
||||||
|
elif message['type'] == 'timeline':
|
||||||
|
try:
|
||||||
|
self.process_timeline(message['TimelineEntry'])
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
log.error('Received invalid PMS message for timeline: %s'
|
||||||
|
% message)
|
||||||
|
elif message['type'] == 'activity':
|
||||||
|
try:
|
||||||
|
self.process_activity(message['ActivityNotification'])
|
||||||
|
except KeyError:
|
||||||
|
log.error('Received invalid PMS message for activity: %s'
|
||||||
|
% message)
|
||||||
|
|
||||||
def multi_delete(self, liste, deleteListe):
|
def multi_delete(self, liste, deleteListe):
|
||||||
"""
|
"""
|
||||||
|
@ -1134,11 +1148,10 @@ class LibrarySync(Thread):
|
||||||
else:
|
else:
|
||||||
successful = self.process_newitems(item)
|
successful = self.process_newitems(item)
|
||||||
if successful and settings('FanartTV') == 'true':
|
if successful and settings('FanartTV') == 'true':
|
||||||
plex_type = v.PLEX_TYPE_FROM_WEBSOCKET[item['type']]
|
if item['type'] in (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_SHOW):
|
||||||
if plex_type in (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_SHOW):
|
|
||||||
self.fanartqueue.put({
|
self.fanartqueue.put({
|
||||||
'plex_id': item['ratingKey'],
|
'plex_id': item['ratingKey'],
|
||||||
'plex_type': plex_type,
|
'plex_type': item['type'],
|
||||||
'refresh': False
|
'refresh': False
|
||||||
})
|
})
|
||||||
if successful is True:
|
if successful is True:
|
||||||
|
@ -1194,22 +1207,25 @@ class LibrarySync(Thread):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def process_deleteditems(self, item):
|
def process_deleteditems(self, item):
|
||||||
if item.get('type') == 1:
|
if item['type'] == v.PLEX_TYPE_MOVIE:
|
||||||
log.debug("Removing movie %s" % item.get('ratingKey'))
|
log.debug("Removing movie %s" % item['ratingKey'])
|
||||||
self.videoLibUpdate = True
|
self.videoLibUpdate = True
|
||||||
with itemtypes.Movies() as movie:
|
with itemtypes.Movies() as movie:
|
||||||
movie.remove(item.get('ratingKey'))
|
movie.remove(item['ratingKey'])
|
||||||
elif item.get('type') in (2, 3, 4):
|
elif item['type'] in (v.PLEX_TYPE_SHOW,
|
||||||
log.debug("Removing episode/season/tv show %s"
|
v.PLEX_TYPE_SEASON,
|
||||||
% item.get('ratingKey'))
|
v.PLEX_TYPE_EPISODE):
|
||||||
|
log.debug("Removing episode/season/tv show %s" % item['ratingKey'])
|
||||||
self.videoLibUpdate = True
|
self.videoLibUpdate = True
|
||||||
with itemtypes.TVShows() as show:
|
with itemtypes.TVShows() as show:
|
||||||
show.remove(item.get('ratingKey'))
|
show.remove(item['ratingKey'])
|
||||||
elif item.get('type') in (8, 9, 10):
|
elif item['type'] in (v.PLEX_TYPE_ARTIST,
|
||||||
log.debug("Removing song/album/artist %s" % item.get('ratingKey'))
|
v.PLEX_TYPE_ALBUM,
|
||||||
|
v.PLEX_TYPE_SONG):
|
||||||
|
log.debug("Removing song/album/artist %s" % item['ratingKey'])
|
||||||
self.musicLibUpdate = True
|
self.musicLibUpdate = True
|
||||||
with itemtypes.Music() as music:
|
with itemtypes.Music() as music:
|
||||||
music.remove(item.get('ratingKey'))
|
music.remove(item['ratingKey'])
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def process_timeline(self, data):
|
def process_timeline(self, data):
|
||||||
|
@ -1223,14 +1239,13 @@ class LibrarySync(Thread):
|
||||||
# (DVR ratingKeys are not unique and might correspond to a
|
# (DVR ratingKeys are not unique and might correspond to a
|
||||||
# movie or episode)
|
# movie or episode)
|
||||||
continue
|
continue
|
||||||
typus = int(item.get('type', 0))
|
typus = v.PLEX_TYPE_FROM_WEBSOCKET[int(item['type'])]
|
||||||
status = int(item.get('state', 0))
|
status = int(item['state'])
|
||||||
if status == 9 or (typus in (1, 4, 10) and status == 5):
|
if status == 9 or (typus in (v.PLEX_TYPE_MOVIE,
|
||||||
|
v.PLEX_TYPE_EPISODE,
|
||||||
|
v.PLEX_TYPE_SONG) and status == 5):
|
||||||
# Only process deleted items OR movies, episodes, tracks/songs
|
# Only process deleted items OR movies, episodes, tracks/songs
|
||||||
plex_id = str(item.get('itemID', '0'))
|
plex_id = str(item['itemID'])
|
||||||
if plex_id == '0':
|
|
||||||
log.error('Received malformed PMS message: %s' % item)
|
|
||||||
continue
|
|
||||||
# Have we already added this element?
|
# Have we already added this element?
|
||||||
for existingItem in self.itemsToProcess:
|
for existingItem in self.itemsToProcess:
|
||||||
if existingItem['ratingKey'] == plex_id:
|
if existingItem['ratingKey'] == plex_id:
|
||||||
|
@ -1245,101 +1260,136 @@ class LibrarySync(Thread):
|
||||||
'attempt': 0
|
'attempt': 0
|
||||||
})
|
})
|
||||||
|
|
||||||
|
def process_activity(self, data):
|
||||||
|
"""
|
||||||
|
PMS is re-scanning an item, e.g. after having changed a movie poster.
|
||||||
|
WATCH OUT for this if it's triggered by our PKC library scan!
|
||||||
|
"""
|
||||||
|
for item in data:
|
||||||
|
if item['event'] != 'ended':
|
||||||
|
# Scan still going on, so skip for now
|
||||||
|
continue
|
||||||
|
elif item['Activity']['type'] != 'library.refresh.items':
|
||||||
|
# Not the type of message relevant for us
|
||||||
|
continue
|
||||||
|
plex_id = GetPlexKeyNumber(item['Activity']['Context']['key'])[1]
|
||||||
|
if plex_id == '':
|
||||||
|
raise KeyError('Could not extract the Plex id')
|
||||||
|
# We're only looking at existing elements - have we synced yet?
|
||||||
|
with plexdb.Get_Plex_DB() as plex_db:
|
||||||
|
kodi_info = plex_db.getItem_byId(plex_id)
|
||||||
|
if kodi_info is None:
|
||||||
|
log.debug('Plex id %s not synced yet - skipping' % plex_id)
|
||||||
|
continue
|
||||||
|
# Have we already added this element?
|
||||||
|
for existingItem in self.itemsToProcess:
|
||||||
|
if existingItem['ratingKey'] == plex_id:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# Haven't added this element to the queue yet
|
||||||
|
self.itemsToProcess.append({
|
||||||
|
'state': None, # Don't need a state here
|
||||||
|
'type': kodi_info[5],
|
||||||
|
'ratingKey': plex_id,
|
||||||
|
'timestamp': getUnixTimestamp(),
|
||||||
|
'attempt': 0
|
||||||
|
})
|
||||||
|
|
||||||
def process_playing(self, data):
|
def process_playing(self, data):
|
||||||
"""
|
"""
|
||||||
Someone (not necessarily the user signed in) is playing something some-
|
Someone (not necessarily the user signed in) is playing something some-
|
||||||
where
|
where
|
||||||
"""
|
"""
|
||||||
items = []
|
items = []
|
||||||
with plexdb.Get_Plex_DB() as plex_db:
|
for item in data:
|
||||||
for item in data:
|
# Drop buffering messages immediately
|
||||||
# Drop buffering messages immediately
|
status = item['state']
|
||||||
status = item.get('state')
|
if status == 'buffering':
|
||||||
if status == 'buffering':
|
continue
|
||||||
continue
|
ratingKey = str(item['ratingKey'])
|
||||||
ratingKey = item.get('ratingKey')
|
with plexdb.Get_Plex_DB() as plex_db:
|
||||||
kodiInfo = plex_db.getItem_byId(ratingKey)
|
kodi_info = plex_db.getItem_byId(ratingKey)
|
||||||
if kodiInfo is None:
|
if kodi_info is None:
|
||||||
# Item not (yet) in Kodi library
|
# Item not (yet) in Kodi library
|
||||||
continue
|
continue
|
||||||
sessionKey = item.get('sessionKey')
|
sessionKey = item['sessionKey']
|
||||||
# Do we already have a sessionKey stored?
|
# Do we already have a sessionKey stored?
|
||||||
if sessionKey not in self.sessionKeys:
|
if sessionKey not in self.sessionKeys:
|
||||||
if settings('plex_serverowned') == 'false':
|
if settings('plex_serverowned') == 'false':
|
||||||
# Not our PMS, we are not authorized to get the
|
# Not our PMS, we are not authorized to get the
|
||||||
# sessions
|
# sessions
|
||||||
# On the bright side, it must be us playing :-)
|
# On the bright side, it must be us playing :-)
|
||||||
self.sessionKeys = {
|
self.sessionKeys = {
|
||||||
sessionKey: {}
|
sessionKey: {}
|
||||||
}
|
}
|
||||||
else:
|
|
||||||
# PMS is ours - get all current sessions
|
|
||||||
self.sessionKeys = GetPMSStatus(state.PLEX_TOKEN)
|
|
||||||
log.debug('Updated current sessions. They are: %s'
|
|
||||||
% self.sessionKeys)
|
|
||||||
if sessionKey not in self.sessionKeys:
|
|
||||||
log.warn('Session key %s still unknown! Skip '
|
|
||||||
'item' % sessionKey)
|
|
||||||
continue
|
|
||||||
|
|
||||||
currSess = self.sessionKeys[sessionKey]
|
|
||||||
if settings('plex_serverowned') != 'false':
|
|
||||||
# Identify the user - same one as signed on with PKC? Skip
|
|
||||||
# update if neither session's username nor userid match
|
|
||||||
# (Owner sometime's returns id '1', not always)
|
|
||||||
if (not state.PLEX_TOKEN and currSess['userId'] == '1'):
|
|
||||||
# PKC not signed in to plex.tv. Plus owner of PMS is
|
|
||||||
# playing (the '1').
|
|
||||||
# Hence must be us (since several users require plex.tv
|
|
||||||
# token for PKC)
|
|
||||||
pass
|
|
||||||
elif not (currSess['userId'] == state.PLEX_USER_ID
|
|
||||||
or
|
|
||||||
currSess['username'] == state.PLEX_USERNAME):
|
|
||||||
log.debug('Our username %s, userid %s did not match '
|
|
||||||
'the session username %s with userid %s'
|
|
||||||
% (state.PLEX_USERNAME,
|
|
||||||
state.PLEX_USER_ID,
|
|
||||||
currSess['username'],
|
|
||||||
currSess['userId']))
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Get an up-to-date XML from the PMS
|
|
||||||
# because PMS will NOT directly tell us:
|
|
||||||
# duration of item
|
|
||||||
# viewCount
|
|
||||||
if currSess.get('duration') is None:
|
|
||||||
xml = GetPlexMetadata(ratingKey)
|
|
||||||
if xml in (None, 401):
|
|
||||||
log.error('Could not get up-to-date xml for item %s'
|
|
||||||
% ratingKey)
|
|
||||||
continue
|
|
||||||
API = PlexAPI.API(xml[0])
|
|
||||||
userdata = API.getUserData()
|
|
||||||
currSess['duration'] = userdata['Runtime']
|
|
||||||
currSess['viewCount'] = userdata['PlayCount']
|
|
||||||
# Sometimes, Plex tells us resume points in milliseconds and
|
|
||||||
# not in seconds - thank you very much!
|
|
||||||
if item.get('viewOffset') > currSess['duration']:
|
|
||||||
resume = item.get('viewOffset') / 1000
|
|
||||||
else:
|
else:
|
||||||
resume = item.get('viewOffset')
|
# PMS is ours - get all current sessions
|
||||||
# Append to list that we need to process
|
self.sessionKeys = GetPMSStatus(state.PLEX_TOKEN)
|
||||||
items.append({
|
log.debug('Updated current sessions. They are: %s'
|
||||||
'ratingKey': ratingKey,
|
% self.sessionKeys)
|
||||||
'kodi_id': kodiInfo[0],
|
if sessionKey not in self.sessionKeys:
|
||||||
'file_id': kodiInfo[1],
|
log.warn('Session key %s still unknown! Skip '
|
||||||
'kodi_type': kodiInfo[4],
|
'item' % sessionKey)
|
||||||
'viewOffset': resume,
|
continue
|
||||||
'state': status,
|
|
||||||
'duration': currSess['duration'],
|
currSess = self.sessionKeys[sessionKey]
|
||||||
'viewCount': currSess['viewCount'],
|
if settings('plex_serverowned') != 'false':
|
||||||
'lastViewedAt': DateToKodi(getUnixTimestamp())
|
# Identify the user - same one as signed on with PKC? Skip
|
||||||
})
|
# update if neither session's username nor userid match
|
||||||
log.debug('Update playstate for user %s with id %s: %s'
|
# (Owner sometime's returns id '1', not always)
|
||||||
% (state.PLEX_USERNAME,
|
if (not state.PLEX_TOKEN and currSess['userId'] == '1'):
|
||||||
state.PLEX_USER_ID,
|
# PKC not signed in to plex.tv. Plus owner of PMS is
|
||||||
items[-1]))
|
# playing (the '1').
|
||||||
|
# Hence must be us (since several users require plex.tv
|
||||||
|
# token for PKC)
|
||||||
|
pass
|
||||||
|
elif not (currSess['userId'] == state.PLEX_USER_ID
|
||||||
|
or
|
||||||
|
currSess['username'] == state.PLEX_USERNAME):
|
||||||
|
log.debug('Our username %s, userid %s did not match '
|
||||||
|
'the session username %s with userid %s'
|
||||||
|
% (state.PLEX_USERNAME,
|
||||||
|
state.PLEX_USER_ID,
|
||||||
|
currSess['username'],
|
||||||
|
currSess['userId']))
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get an up-to-date XML from the PMS
|
||||||
|
# because PMS will NOT directly tell us:
|
||||||
|
# duration of item
|
||||||
|
# viewCount
|
||||||
|
if currSess.get('duration') is None:
|
||||||
|
xml = GetPlexMetadata(ratingKey)
|
||||||
|
if xml in (None, 401):
|
||||||
|
log.error('Could not get up-to-date xml for item %s'
|
||||||
|
% ratingKey)
|
||||||
|
continue
|
||||||
|
API = PlexAPI.API(xml[0])
|
||||||
|
userdata = API.getUserData()
|
||||||
|
currSess['duration'] = userdata['Runtime']
|
||||||
|
currSess['viewCount'] = userdata['PlayCount']
|
||||||
|
# Sometimes, Plex tells us resume points in milliseconds and
|
||||||
|
# not in seconds - thank you very much!
|
||||||
|
if item.get('viewOffset') > currSess['duration']:
|
||||||
|
resume = item.get('viewOffset') / 1000
|
||||||
|
else:
|
||||||
|
resume = item.get('viewOffset')
|
||||||
|
# Append to list that we need to process
|
||||||
|
items.append({
|
||||||
|
'ratingKey': ratingKey,
|
||||||
|
'kodi_id': kodi_info[0],
|
||||||
|
'file_id': kodi_info[1],
|
||||||
|
'kodi_type': kodi_info[4],
|
||||||
|
'viewOffset': resume,
|
||||||
|
'state': status,
|
||||||
|
'duration': currSess['duration'],
|
||||||
|
'viewCount': currSess['viewCount'],
|
||||||
|
'lastViewedAt': DateToKodi(getUnixTimestamp())
|
||||||
|
})
|
||||||
|
log.debug('Update playstate for user %s with id %s: %s'
|
||||||
|
% (state.PLEX_USERNAME,
|
||||||
|
state.PLEX_USER_ID,
|
||||||
|
items[-1]))
|
||||||
# Now tell Kodi where we are
|
# Now tell Kodi where we are
|
||||||
for item in items:
|
for item in items:
|
||||||
itemFkt = getattr(itemtypes,
|
itemFkt = getattr(itemtypes,
|
||||||
|
|
|
@ -186,7 +186,7 @@ class PMS_Websocket(WebSocket):
|
||||||
|
|
||||||
def process(self, opcode, message):
|
def process(self, opcode, message):
|
||||||
if opcode not in self.opcode_data:
|
if opcode not in self.opcode_data:
|
||||||
return False
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
message = loads(message)
|
message = loads(message)
|
||||||
|
@ -194,28 +194,32 @@ class PMS_Websocket(WebSocket):
|
||||||
log.error('%s: Error decoding message from websocket'
|
log.error('%s: Error decoding message from websocket'
|
||||||
% self.__class__.__name__)
|
% self.__class__.__name__)
|
||||||
log.error(message)
|
log.error(message)
|
||||||
return False
|
return
|
||||||
try:
|
try:
|
||||||
message = message['NotificationContainer']
|
message = message['NotificationContainer']
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.error('%s: Could not parse PMS message: %s'
|
log.error('%s: Could not parse PMS message: %s'
|
||||||
% (self.__class__.__name__, message))
|
% (self.__class__.__name__, message))
|
||||||
return False
|
return
|
||||||
# Triage
|
# Triage
|
||||||
typus = message.get('type')
|
typus = message.get('type')
|
||||||
if typus is None:
|
if typus is None:
|
||||||
log.error('%s: No message type, dropping message: %s'
|
log.error('%s: No message type, dropping message: %s'
|
||||||
% (self.__class__.__name__, message))
|
% (self.__class__.__name__, message))
|
||||||
return False
|
return
|
||||||
log.debug('%s: Received message from PMS server: %s'
|
log.debug('%s: Received message from PMS server: %s'
|
||||||
% (self.__class__.__name__, message))
|
% (self.__class__.__name__, message))
|
||||||
# Drop everything we're not interested in
|
# Drop everything we're not interested in
|
||||||
if typus not in ('playing', 'timeline'):
|
if typus not in ('playing', 'timeline', 'activity'):
|
||||||
return True
|
return
|
||||||
|
elif typus == 'activity' and state.DB_SCAN is True:
|
||||||
# Put PMS message on queue and let libsync take care of it
|
# Only add to processing if PKC is NOT doing a lib scan (and thus
|
||||||
self.queue.put(message)
|
# possibly causing these reprocessing messages en mass)
|
||||||
return True
|
log.debug('%s: Dropping message as PKC is currently synching'
|
||||||
|
% self.__class__.__name__)
|
||||||
|
else:
|
||||||
|
# Put PMS message on queue and let libsync take care of it
|
||||||
|
self.queue.put(message)
|
||||||
|
|
||||||
def IOError_response(self):
|
def IOError_response(self):
|
||||||
log.warn("Repeatedly could not connect to PMS, "
|
log.warn("Repeatedly could not connect to PMS, "
|
||||||
|
@ -244,7 +248,7 @@ class Alexa_Websocket(WebSocket):
|
||||||
|
|
||||||
def process(self, opcode, message):
|
def process(self, opcode, message):
|
||||||
if opcode not in self.opcode_data:
|
if opcode not in self.opcode_data:
|
||||||
return False
|
return
|
||||||
log.debug('%s: Received the following message from Alexa:'
|
log.debug('%s: Received the following message from Alexa:'
|
||||||
% self.__class__.__name__)
|
% self.__class__.__name__)
|
||||||
log.debug('%s: %s' % (self.__class__.__name__, message))
|
log.debug('%s: %s' % (self.__class__.__name__, message))
|
||||||
|
@ -253,22 +257,21 @@ class Alexa_Websocket(WebSocket):
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
log.error('%s: Error decoding message from Alexa: %s'
|
log.error('%s: Error decoding message from Alexa: %s'
|
||||||
% (self.__class__.__name__, ex))
|
% (self.__class__.__name__, ex))
|
||||||
return False
|
return
|
||||||
try:
|
try:
|
||||||
if message.attrib['command'] == 'processRemoteControlCommand':
|
if message.attrib['command'] == 'processRemoteControlCommand':
|
||||||
message = message[0]
|
message = message[0]
|
||||||
else:
|
else:
|
||||||
log.error('%s: Unknown Alexa message received'
|
log.error('%s: Unknown Alexa message received'
|
||||||
% self.__class__.__name__)
|
% self.__class__.__name__)
|
||||||
return False
|
return
|
||||||
except:
|
except:
|
||||||
log.error('%s: Could not parse Alexa message'
|
log.error('%s: Could not parse Alexa message'
|
||||||
% self.__class__.__name__)
|
% self.__class__.__name__)
|
||||||
return False
|
return
|
||||||
process_command(message.attrib['path'][1:],
|
process_command(message.attrib['path'][1:],
|
||||||
message.attrib,
|
message.attrib,
|
||||||
queue=self.mgr.plexCompanion.queue)
|
queue=self.mgr.plexCompanion.queue)
|
||||||
return True
|
|
||||||
|
|
||||||
def IOError_response(self):
|
def IOError_response(self):
|
||||||
pass
|
pass
|
||||||
|
|
Loading…
Add table
Reference in a new issue