diff --git a/resources/lib/itemtypes/__init__.py b/resources/lib/itemtypes/__init__.py index c68ed730..3c1e8be9 100644 --- a/resources/lib/itemtypes/__init__.py +++ b/resources/lib/itemtypes/__init__.py @@ -18,3 +18,13 @@ ITEMTYPE_FROM_PLEXTYPE = { v.PLEX_TYPE_ALBUM: Album, v.PLEX_TYPE_SONG: Song } + +ITEMTYPE_FROM_KODITYPE = { + v.KODI_TYPE_MOVIE: Movie, + v.KODI_TYPE_SHOW: Show, + v.KODI_TYPE_SEASON: Season, + v.KODI_TYPE_EPISODE: Episode, + v.KODI_TYPE_ARTIST: Artist, + v.KODI_TYPE_ALBUM: Album, + v.KODI_TYPE_SONG: Song +} diff --git a/resources/lib/itemtypes/common.py b/resources/lib/itemtypes/common.py index b9b53046..7a352ba0 100644 --- a/resources/lib/itemtypes/common.py +++ b/resources/lib/itemtypes/common.py @@ -109,26 +109,24 @@ class ItemBase(object): db_item[4], userdata['UserRating']) - def updatePlaystate(self, mark_played, view_count, resume, duration, - file_id, lastViewedAt, plex_type): + def update_playstate(self, mark_played, view_count, resume, duration, + kodi_fileid, lastViewedAt, plex_type): """ Use with websockets, not xml """ # If the playback was stopped, check whether we need to increment the # playcount. PMS won't tell us the playcount via websockets - LOG.debug('Playstate file_id %s: viewcount: %s, resume: %s, type: %s', - file_id, view_count, resume, plex_type) if mark_played: - LOG.info('Marking as completely watched in Kodi') + LOG.info('Marking item as completely watched in Kodi') try: view_count += 1 except TypeError: view_count = 1 resume = 0 # Do the actual update - self.kodi_db.set_resume(file_id, + self.kodi_db.set_resume(kodi_fileid, resume, duration, view_count, - lastViewedAt, + utils.unix_date_to_kodi(lastViewedAt), plex_type) diff --git a/resources/lib/library_sync/__init__.py b/resources/lib/library_sync/__init__.py index 8ea84ff4..34aa27f6 100644 --- a/resources/lib/library_sync/__init__.py +++ b/resources/lib/library_sync/__init__.py @@ -3,5 +3,7 @@ from __future__ import absolute_import, division, unicode_literals from .full_sync import start, PLAYLIST_SYNC_ENABLED from .time import sync_pms_time +from .websocket import store_websocket_message, process_websocket_messages, \ + WEBSOCKET_MESSAGES, PLAYSTATE_SESSIONS from .common import update_kodi_library -from . import fanart +from .fanart import ThreadedProcessFanart, FANART_QUEUE diff --git a/resources/lib/library_sync/fanart.py b/resources/lib/library_sync/fanart.py index f3700178..d08ff8ae 100644 --- a/resources/lib/library_sync/fanart.py +++ b/resources/lib/library_sync/fanart.py @@ -9,11 +9,11 @@ from .. import backgroundthread from .. import utils, kodidb_functions as kodidb from .. import itemtypes, artwork, plex_functions as PF, variables as v, state -############################################################################### -LOG = getLogger('PLEX.library_sync.fanart') +LOG = getLogger('PLEX.sync.fanart') -############################################################################### +SYNC_FANART = utils.settings('FanartTV') == 'true' +FANART_QUEUE = backgroundthread.Queue.Queue() class ThreadedProcessFanart(backgroundthread.KillableThread): @@ -30,10 +30,6 @@ class ThreadedProcessFanart(backgroundthread.KillableThread): fanart. If False, will only get missing } """ - def __init__(self, queue): - self.queue = queue - super(ThreadedProcessFanart, self).__init__() - def isCanceled(self): return xbmc.abortRequested or state.STOP_PKC @@ -80,11 +76,11 @@ class ThreadedProcessFanart(backgroundthread.KillableThread): xbmc.sleep(1000) # grabs Plex item from queue try: - item = self.queue.get(block=False) + item = FANART_QUEUE.get(block=False) except backgroundthread.Empty: xbmc.sleep(1000) continue - self.queue.task_done() + FANART_QUEUE.task_done() if isinstance(item, artwork.ArtworkSyncMessage): if state.IMAGE_SYNC_NOTIFICATIONS: utils.dialog('notification', diff --git a/resources/lib/library_sync/websocket.py b/resources/lib/library_sync/websocket.py new file mode 100644 index 00000000..64000702 --- /dev/null +++ b/resources/lib/library_sync/websocket.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +from __future__ import absolute_import, division, unicode_literals +from logging import getLogger + +from .common import update_kodi_library +from .full_sync import PLAYLIST_SYNC_ENABLED +from .fanart import FANART_QUEUE, SYNC_FANART +from ..plex_api import API +from ..plex_db import PlexDB +from .. import playlists, plex_functions as PF, itemtypes +from .. import utils, variables as v, state + +LOG = getLogger('PLEX.sync.websocket') + +WEBSOCKET_MESSAGES = [] +# Dict to save info for Plex items currently being played somewhere +PLAYSTATE_SESSIONS = {} + + +def interrupt_processing(): + return state.STOP_PKC or state.SUSPEND_LIBRARY_THREAD or state.STOP_SYNC + + +def multi_delete(input_list, delete_list): + """ + Deletes the list items of input_list at the positions in delete_list + (which can be in any arbitrary order) + """ + for index in sorted(delete_list, reverse=True): + del input_list[index] + return input_list + + +def store_websocket_message(message): + """ + processes json.loads() messages from websocket. Triage what we need to + do with "process_" methods + """ + if message['type'] == 'playing': + process_playing(message['PlaySessionStateNotification']) + elif message['type'] == 'timeline': + store_timeline_message(message['TimelineEntry']) + elif message['type'] == 'activity': + store_activity_message(message['ActivityNotification']) + + +def process_websocket_messages(): + """ + Periodically called to process new/updated PMS items + + PMS needs a while to download info from internet AFTER it + showed up under 'timeline' websocket messages + + data['type']: + 1: movie + 2: tv show?? + 3: season?? + 4: episode + 8: artist (band) + 9: album + 10: track (song) + 12: trailer, extras? + + data['state']: + 0: 'created', + 2: 'matching', + 3: 'downloading', + 4: 'loading', + 5: 'finished', + 6: 'analyzing', + 9: 'deleted' + """ + global WEBSOCKET_MESSAGES + now = utils.unix_timestamp() + update_kodi_video_library, update_kodi_music_library = False, False + delete_list = [] + for i, message in enumerate(WEBSOCKET_MESSAGES): + if interrupt_processing(): + # Chances are that Kodi gets shut down + break + if message['state'] == 9: + successful, video, music = process_delete_message(message) + elif now - message['timestamp'] < state.BACKGROUNDSYNC_SAFTYMARGIN: + # We haven't waited long enough for the PMS to finish processing the + # item. Do it later (excepting deletions) + continue + else: + successful, video, music = process_new_item_message(message) + if (successful and SYNC_FANART and + message['type'] in (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_SHOW)): + FANART_QUEUE.put({ + 'plex_id': utils.cast(int, message['ratingKey']), + 'plex_type': message['type'], + 'refresh': False + }) + if successful is True: + delete_list.append(i) + update_kodi_video_library = True if video else update_kodi_video_library + update_kodi_music_library = True if music else update_kodi_music_library + else: + # Safety net if we can't process an item + message['attempt'] += 1 + if message['attempt'] > 3: + LOG.error('Repeatedly could not process message %s, abort', + message) + delete_list.append(i) + + # Get rid of the items we just processed + if delete_list: + WEBSOCKET_MESSAGES = multi_delete(WEBSOCKET_MESSAGES, delete_list) + # Let Kodi know of the change + if update_kodi_video_library or update_kodi_music_library: + update_kodi_library(video=update_kodi_video_library, + music=update_kodi_music_library) + + +def process_new_item_message(message): + xml = PF.GetPlexMetadata(message['ratingKey']) + try: + plex_type = xml[0].attrib['type'] + except (IndexError, KeyError, TypeError): + LOG.error('Could not download metadata for %s', message['ratingKey']) + return False, False, False + LOG.debug("Processing new/updated PMS item: %s", message['ratingKey']) + typus = itemtypes.ITEMTYPE_FROM_PLEXTYPE[plex_type](utils.unix_timestamp()) + typus.add_update(xml[0], + section_name=xml.get('librarySectionTitle'), + section_id=xml.get('librarySectionID')) + return True, plex_type in v.PLEX_VIDEOTYPES, plex_type in v.PLEX_AUDIOTYPES + + +def process_delete_message(message): + plex_type = message['type'] + typus = itemtypes.ITEMTYPE_FROM_PLEXTYPE[plex_type](None) + typus.remove(message['ratingKey'], plex_type=plex_type) + return True, plex_type in v.PLEX_VIDEOTYPES, plex_type in v.PLEX_AUDIOTYPES + + +def store_timeline_message(data): + """ + PMS is messing with the library items, e.g. new or changed. Put in our + "processing queue" for later + """ + global WEBSOCKET_MESSAGES + for message in data: + if 'tv.plex' in message.get('identifier', ''): + # Ommit Plex DVR messages - the Plex IDs are not corresponding + # (DVR ratingKeys are not unique and might correspond to a + # movie or episode) + continue + typus = v.PLEX_TYPE_FROM_WEBSOCKET[int(message['type'])] + if typus == v.PLEX_TYPE_CLIP: + # No need to process extras or trailers + continue + status = int(message['state']) + if typus == 'playlist' and PLAYLIST_SYNC_ENABLED: + playlists.websocket(plex_id=unicode(message['itemID']), + status=status) + elif status == 9: + # Immediately and always process deletions (as the PMS will + # send additional message with other codes) + WEBSOCKET_MESSAGES.append({ + 'state': status, + 'plex_type': typus, + 'plex_id': utils.cast(int, message['itemID']), + 'timestamp': utils.unix_timestamp(), + 'attempt': 0 + }) + elif typus in (v.PLEX_TYPE_MOVIE, + v.PLEX_TYPE_EPISODE, + v.PLEX_TYPE_SONG) and status == 5: + plex_id = int(message['itemID']) + # Have we already added this element for processing? + for existing_message in WEBSOCKET_MESSAGES: + if existing_message['plex_id'] == plex_id: + break + else: + # Haven't added this element to the queue yet + WEBSOCKET_MESSAGES.append({ + 'state': status, + 'plex_type': typus, + 'plex_id': plex_id, + 'timestamp': utils.unix_timestamp(), + 'attempt': 0 + }) + + +def store_activity_message(data): + """ + PMS is re-scanning an item, e.g. after having changed a movie poster. + WATCH OUT for this if it's triggered by our PKC library scan! + """ + global WEBSOCKET_MESSAGES + for message in data: + if message['event'] != 'ended': + # Scan still going on, so skip for now + continue + elif message['Activity'].get('Context') is None: + # Not related to any Plex element, but entire library + continue + elif message['Activity']['type'] != 'library.refresh.items': + # Not the type of message relevant for us + continue + plex_id = PF.GetPlexKeyNumber(message['Activity']['Context']['key'])[1] + if not plex_id: + # Likely a Plex id like /library/metadata/3/children + continue + # We're only looking at existing elements - have we synced yet? + with PlexDB() as plexdb: + typus = plexdb.item_by_id(plex_id, plex_type=None) + if not typus: + LOG.debug('plex_id %s not synced yet - skipping', plex_id) + continue + # Have we already added this element? + for existing_message in WEBSOCKET_MESSAGES: + if existing_message['plex_id'] == plex_id: + break + else: + # Haven't added this element to the queue yet + WEBSOCKET_MESSAGES.append({ + 'state': None, # Don't need a state here + 'plex_type': typus['plex_type'], + 'plex_id': plex_id, + 'timestamp': utils.unix_timestamp(), + 'attempt': 0 + }) + + +def process_playing(data): + """ + Someone (not necessarily the user signed in) is playing something some- + where + """ + global PLAYSTATE_SESSIONS + for message in data: + status = message['state'] + if status == 'buffering' or status == 'stopped': + # Drop buffering and stop messages immediately - no value + continue + plex_id = int(message['ratingKey']) + skip = False + for pid in (0, 1, 2): + if plex_id == state.PLAYER_STATES[pid]['plex_id']: + # Kodi is playing this message - no need to set the playstate + skip = True + if skip: + continue + session_key = message['sessionKey'] + # Do we already have a sessionKey stored? + if session_key not in PLAYSTATE_SESSIONS: + with PlexDB() as plexdb: + typus = plexdb.item_by_id(plex_id, plex_type=None) + if not typus: + # Item not (yet) in Kodi library + continue + if utils.settings('plex_serverowned') == 'false': + # Not our PMS, we are not authorized to get the sessions + # On the bright side, it must be us playing :-) + PLAYSTATE_SESSIONS[session_key] = {} + else: + # PMS is ours - get all current sessions + PLAYSTATE_SESSIONS.update(PF.GetPMSStatus(state.PLEX_TOKEN)) + LOG.debug('Updated current sessions. They are: %s', + PLAYSTATE_SESSIONS) + if session_key not in PLAYSTATE_SESSIONS: + LOG.info('Session key %s still unknown! Skip ' + 'playstate update', session_key) + continue + # Attach Kodi info to the session + PLAYSTATE_SESSIONS[session_key]['kodi_id'] = typus['kodi_id'] + PLAYSTATE_SESSIONS[session_key]['file_id'] = typus['kodi_fileid'] + PLAYSTATE_SESSIONS[session_key]['kodi_type'] = typus['kodi_type'] + session = PLAYSTATE_SESSIONS[session_key] + if utils.settings('plex_serverowned') != 'false': + # Identify the user - same one as signed on with PKC? Skip + # update if neither session's username nor userid match + # (Owner sometime's returns id '1', not always) + if not state.PLEX_TOKEN and session['userId'] == '1': + # PKC not signed in to plex.tv. Plus owner of PMS is + # playing (the '1'). + # Hence must be us (since several users require plex.tv + # token for PKC) + pass + elif not (session['userId'] == state.PLEX_USER_ID or + session['username'] == state.PLEX_USERNAME): + LOG.debug('Our username %s, userid %s did not match ' + 'the session username %s with userid %s', + state.PLEX_USERNAME, + state.PLEX_USER_ID, + session['username'], + session['userId']) + continue + # Get an up-to-date XML from the PMS because PMS will NOT directly + # tell us: duration of item viewCount + if not session.get('duration'): + xml = PF.GetPlexMetadata(plex_id) + if xml in (None, 401): + LOG.error('Could not get up-to-date xml for item %s', + plex_id) + continue + api = API(xml[0]) + userdata = api.userdata() + session['duration'] = userdata['Runtime'] + session['viewCount'] = userdata['PlayCount'] + # Sometimes, Plex tells us resume points in milliseconds and + # not in seconds - thank you very much! + if message['viewOffset'] > session['duration']: + resume = message['viewOffset'] / 1000 + else: + resume = message['viewOffset'] + if resume < v.IGNORE_SECONDS_AT_START: + continue + try: + completed = float(resume) / float(session['duration']) + except (ZeroDivisionError, TypeError): + LOG.error('Could not mark playstate for %s and session %s', + data, session) + continue + if completed >= v.MARK_PLAYED_AT: + # Only mark completely watched ONCE + if session.get('marked_played') is None: + session['marked_played'] = True + mark_played = True + else: + # Don't mark it as completely watched again + continue + else: + mark_played = False + LOG.debug('Update playstate for user %s for %s with plex id %s to ' + 'viewCount %s, resume %s, mark_played %s', + state.PLEX_USERNAME, session['kodi_type'], plex_id, + session['viewCount'], resume, mark_played) + func = itemtypes.ITEMTYPE_FROM_KODITYPE[session['kodi_type']] + with func(None) as fkt: + fkt.update_playstate(mark_played, + session['viewCount'], + resume, + session['duration'], + session['file_id'], + utils.unix_timestamp(), + v.PLEX_TYPE_FROM_KODI_TYPE[session['kodi_type']]) diff --git a/resources/lib/sync.py b/resources/lib/sync.py index f9133531..8f7ae4ae 100644 --- a/resources/lib/sync.py +++ b/resources/lib/sync.py @@ -8,11 +8,10 @@ import xbmc from . import library_sync -from .plex_api import API from .downloadutils import DownloadUtils as DU from . import backgroundthread, utils, path_ops -from . import itemtypes, plex_db, kodidb_functions as kodidb -from . import artwork, plex_functions as PF +from . import plex_db, kodidb_functions as kodidb +from . import artwork from . import variables as v, state LOG = getLogger('PLEX.sync') @@ -36,8 +35,6 @@ class Sync(backgroundthread.KillableThread): The one and only library sync thread. Spawn only 1! """ def __init__(self): - self.items_to_process = [] - self.session_keys = {} self.sync_successful = False self.last_full_sync = 0 if utils.settings('FanartTV') == 'true': @@ -49,9 +46,6 @@ class Sync(backgroundthread.KillableThread): # How long should we wait at least to process new/changed PMS items? # Show sync dialog even if user deactivated? self.force_dialog = False - # Need to be set accordingly later - self.update_kodi_video_library = False - self.update_kodi_music_library = False # Lock used to wait on a full sync, e.g. on initial sync self.lock = backgroundthread.threading.Lock() super(Sync, self).__init__() @@ -97,349 +91,6 @@ class Sync(backgroundthread.KillableThread): message=message, icon='{error}') - def process_message(self, message): - """ - processes json.loads() messages from websocket. Triage what we need to - do with "process_" methods - """ - if message['type'] == 'playing': - self.process_playing(message['PlaySessionStateNotification']) - elif message['type'] == 'timeline': - self.process_timeline(message['TimelineEntry']) - elif message['type'] == 'activity': - self.process_activity(message['ActivityNotification']) - - def multi_delete(self, liste, delete_list): - """ - Deletes the list items of liste at the positions in delete_list - (which can be in any arbitrary order) - """ - indexes = sorted(delete_list, reverse=True) - for index in indexes: - del liste[index] - return liste - - def process_items(self): - """ - Periodically called to process new/updated PMS items - - PMS needs a while to download info from internet AFTER it - showed up under 'timeline' websocket messages - - data['type']: - 1: movie - 2: tv show?? - 3: season?? - 4: episode - 8: artist (band) - 9: album - 10: track (song) - 12: trailer, extras? - - data['state']: - 0: 'created', - 2: 'matching', - 3: 'downloading', - 4: 'loading', - 5: 'finished', - 6: 'analyzing', - 9: 'deleted' - """ - now = utils.unix_timestamp() - delete_list = [] - for i, item in enumerate(self.items_to_process): - if self.isCanceled() or self.suspended(): - # Chances are that Kodi gets shut down - break - if item['state'] == 9: - successful = self.process_deleteditems(item) - elif now - item['timestamp'] < state.BACKGROUNDSYNC_SAFTYMARGIN: - # We haven't waited long enough for the PMS to finish - # processing the item. Do it later (excepting deletions) - continue - else: - successful = self.process_newitems(item) - if successful and utils.settings('FanartTV') == 'true': - if item['type'] in (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_SHOW): - self.fanartqueue.put({ - 'plex_id': item['ratingKey'], - 'plex_type': item['type'], - 'refresh': False - }) - if successful is True: - delete_list.append(i) - else: - # Safety net if we can't process an item - item['attempt'] += 1 - if item['attempt'] > 3: - LOG.error('Repeatedly could not process item %s, abort', - item) - delete_list.append(i) - - # Get rid of the items we just processed - if delete_list: - self.items_to_process = self.multi_delete(self.items_to_process, - delete_list) - # Let Kodi know of the change - if self.update_kodi_video_library or self.update_kodi_music_library: - library_sync.update_library(video=self.update_kodi_video_library, - music=self.update_kodi_music_library) - self.update_kodi_video_library = False - self.update_kodi_music_library = False - - def process_newitems(self, item): - xml = PF.GetPlexMetadata(item['ratingKey']) - try: - mediatype = xml[0].attrib['type'] - except (IndexError, KeyError, TypeError): - LOG.error('Could not download metadata for %s', item['ratingKey']) - return False - LOG.debug("Processing new/updated PMS item: %s", item['ratingKey']) - viewtag = xml.attrib.get('librarySectionTitle') - viewid = xml.attrib.get('librarySectionID') - if mediatype == v.PLEX_TYPE_MOVIE: - self.update_kodi_video_library = True - with itemtypes.Movies() as movie: - movie.add_update(xml[0], - viewtag=viewtag, - viewid=viewid) - elif mediatype == v.PLEX_TYPE_EPISODE: - self.update_kodi_video_library = True - with itemtypes.TVShows() as show: - show.add_updateEpisode(xml[0], - viewtag=viewtag, - viewid=viewid) - elif mediatype == v.PLEX_TYPE_SONG: - self.update_kodi_music_library = True - with itemtypes.Music() as music_db: - music_db.add_updateSong(xml[0], viewtag=viewtag, viewid=viewid) - return True - - def process_deleteditems(self, item): - if item['type'] == v.PLEX_TYPE_MOVIE: - LOG.debug("Removing movie %s", item['ratingKey']) - self.update_kodi_video_library = True - with itemtypes.Movies() as movie: - movie.remove(item['ratingKey']) - elif item['type'] in (v.PLEX_TYPE_SHOW, - v.PLEX_TYPE_SEASON, - v.PLEX_TYPE_EPISODE): - LOG.debug("Removing episode/season/show with plex id %s", - item['ratingKey']) - self.update_kodi_video_library = True - with itemtypes.TVShows() as show: - show.remove(item['ratingKey']) - elif item['type'] in (v.PLEX_TYPE_ARTIST, - v.PLEX_TYPE_ALBUM, - v.PLEX_TYPE_SONG): - LOG.debug("Removing song/album/artist %s", item['ratingKey']) - self.update_kodi_music_library = True - with itemtypes.Music() as music_db: - music_db.remove(item['ratingKey']) - return True - - def process_timeline(self, data): - """ - PMS is messing with the library items, e.g. new or changed. Put in our - "processing queue" for later - """ - for item in data: - if 'tv.plex' in item.get('identifier', ''): - # Ommit Plex DVR messages - the Plex IDs are not corresponding - # (DVR ratingKeys are not unique and might correspond to a - # movie or episode) - continue - typus = v.PLEX_TYPE_FROM_WEBSOCKET[int(item['type'])] - if typus == v.PLEX_TYPE_CLIP: - # No need to process extras or trailers - continue - status = int(item['state']) - if typus == 'playlist': - if not library_sync.PLAYLIST_SYNC_ENABLED: - continue - playlists.websocket(plex_id=unicode(item['itemID']), - status=status) - elif status == 9: - # Immediately and always process deletions (as the PMS will - # send additional message with other codes) - self.items_to_process.append({ - 'state': status, - 'type': typus, - 'ratingKey': str(item['itemID']), - 'timestamp': utils.unix_timestamp(), - 'attempt': 0 - }) - elif typus in (v.PLEX_TYPE_MOVIE, - v.PLEX_TYPE_EPISODE, - v.PLEX_TYPE_SONG) and status == 5: - plex_id = str(item['itemID']) - # Have we already added this element for processing? - for existing_item in self.items_to_process: - if existing_item['ratingKey'] == plex_id: - break - else: - # Haven't added this element to the queue yet - self.items_to_process.append({ - 'state': status, - 'type': typus, - 'ratingKey': plex_id, - 'timestamp': utils.unix_timestamp(), - 'attempt': 0 - }) - - def process_activity(self, data): - """ - PMS is re-scanning an item, e.g. after having changed a movie poster. - WATCH OUT for this if it's triggered by our PKC library scan! - """ - for item in data: - if item['event'] != 'ended': - # Scan still going on, so skip for now - continue - elif item['Activity'].get('Context') is None: - # Not related to any Plex element, but entire library - continue - elif item['Activity']['type'] != 'library.refresh.items': - # Not the type of message relevant for us - continue - plex_id = PF.GetPlexKeyNumber(item['Activity']['Context']['key'])[1] - if plex_id == '': - # Likely a Plex id like /library/metadata/3/children - continue - # We're only looking at existing elements - have we synced yet? - with plexdb.Get_Plex_DB() as plex_db: - kodi_info = plex_db.getItem_byId(plex_id) - if kodi_info is None: - LOG.debug('Plex id %s not synced yet - skipping', plex_id) - continue - # Have we already added this element? - for existing_item in self.items_to_process: - if existing_item['ratingKey'] == plex_id: - break - else: - # Haven't added this element to the queue yet - self.items_to_process.append({ - 'state': None, # Don't need a state here - 'type': kodi_info[5], - 'ratingKey': plex_id, - 'timestamp': utils.unix_timestamp(), - 'attempt': 0 - }) - - def process_playing(self, data): - """ - Someone (not necessarily the user signed in) is playing something some- - where - """ - for item in data: - status = item['state'] - if status == 'buffering' or status == 'stopped': - # Drop buffering and stop messages immediately - no value - continue - plex_id = item['ratingKey'] - skip = False - for pid in (0, 1, 2): - if plex_id == state.PLAYER_STATES[pid]['plex_id']: - # Kodi is playing this item - no need to set the playstate - skip = True - if skip: - continue - session_key = item['sessionKey'] - # Do we already have a sessionKey stored? - if session_key not in self.session_keys: - with plexdb.Get_Plex_DB() as plex_db: - kodi_info = plex_db.getItem_byId(plex_id) - if kodi_info is None: - # Item not (yet) in Kodi library - continue - if utils.settings('plex_serverowned') == 'false': - # Not our PMS, we are not authorized to get the sessions - # On the bright side, it must be us playing :-) - self.session_keys[session_key] = {} - else: - # PMS is ours - get all current sessions - self.session_keys.update(PF.GetPMSStatus(state.PLEX_TOKEN)) - LOG.debug('Updated current sessions. They are: %s', - self.session_keys) - if session_key not in self.session_keys: - LOG.info('Session key %s still unknown! Skip ' - 'playstate update', session_key) - continue - # Attach Kodi info to the session - self.session_keys[session_key]['kodi_id'] = kodi_info[0] - self.session_keys[session_key]['file_id'] = kodi_info[1] - self.session_keys[session_key]['kodi_type'] = kodi_info[4] - session = self.session_keys[session_key] - if utils.settings('plex_serverowned') != 'false': - # Identify the user - same one as signed on with PKC? Skip - # update if neither session's username nor userid match - # (Owner sometime's returns id '1', not always) - if not state.PLEX_TOKEN and session['userId'] == '1': - # PKC not signed in to plex.tv. Plus owner of PMS is - # playing (the '1'). - # Hence must be us (since several users require plex.tv - # token for PKC) - pass - elif not (session['userId'] == state.PLEX_USER_ID or - session['username'] == state.PLEX_USERNAME): - LOG.debug('Our username %s, userid %s did not match ' - 'the session username %s with userid %s', - state.PLEX_USERNAME, - state.PLEX_USER_ID, - session['username'], - session['userId']) - continue - # Get an up-to-date XML from the PMS because PMS will NOT directly - # tell us: duration of item viewCount - if session.get('duration') is None: - xml = PF.GetPlexMetadata(plex_id) - if xml in (None, 401): - LOG.error('Could not get up-to-date xml for item %s', - plex_id) - continue - api = API(xml[0]) - userdata = api.userdata() - session['duration'] = userdata['Runtime'] - session['viewCount'] = userdata['PlayCount'] - # Sometimes, Plex tells us resume points in milliseconds and - # not in seconds - thank you very much! - if item['viewOffset'] > session['duration']: - resume = item['viewOffset'] / 1000 - else: - resume = item['viewOffset'] - if resume < v.IGNORE_SECONDS_AT_START: - continue - try: - completed = float(resume) / float(session['duration']) - except (ZeroDivisionError, TypeError): - LOG.error('Could not mark playstate for %s and session %s', - data, session) - continue - if completed >= v.MARK_PLAYED_AT: - # Only mark completely watched ONCE - if session.get('marked_played') is None: - session['marked_played'] = True - mark_played = True - else: - # Don't mark it as completely watched again - continue - else: - mark_played = False - LOG.debug('Update playstate for user %s with id %s for plex id %s', - state.PLEX_USERNAME, state.PLEX_USER_ID, plex_id) - item_fkt = getattr(itemtypes, - v.ITEMTYPE_FROM_KODITYPE[session['kodi_type']]) - with item_fkt() as fkt: - plex_type = v.PLEX_TYPE_FROM_KODI_TYPE[session['kodi_type']] - fkt.updatePlaystate(mark_played, - session['viewCount'], - resume, - session['duration'], - session['file_id'], - utils.unix_date_to_kodi( - utils.unix_timestamp()), - plex_type) - def sync_fanart(self, missing_only=True, refresh=False): """ Throw items to the fanart queue in order to download missing (or all) @@ -549,7 +200,7 @@ class Sync(backgroundthread.KillableThread): install_sync_done = utils.settings('SyncInstallRunDone') == 'true' playlist_monitor = None initial_sync_done = False - last_processing = 0 + last_websocket_processing = 0 last_time_sync = 0 one_day_in_seconds = 60 * 60 * 24 # Link to Websocket queue @@ -691,12 +342,13 @@ class Sync(backgroundthread.KillableThread): library_sync.sync_pms_time() last_time_sync = now elif not state.BACKGROUND_SYNC_DISABLED: - # Check back whether we should process something - # Only do this once every while (otherwise, potentially - # many screen refreshes lead to flickering) - if now - last_processing > 5: - last_processing = now - self.process_items() + # Check back whether we should process something Only do + # this once a while (otherwise, potentially many screen + # refreshes lead to flickering) + if (library_sync.WEBSOCKET_MESSAGES and + now - last_websocket_processing > 5): + last_websocket_processing = now + library_sync.process_websocket_messages() # See if there is a PMS message we need to handle try: message = queue.get(block=False) @@ -704,7 +356,7 @@ class Sync(backgroundthread.KillableThread): pass # Got a message from PMS; process it else: - self.process_message(message) + library_sync.store_websocket_message(message) queue.task_done() # Sleep just a bit xbmc.sleep(10)