From 9b76795ea4b81481649f5dbd358c59db1f150171 Mon Sep 17 00:00:00 2001 From: Croneter Date: Thu, 21 Jun 2018 20:43:39 +0200 Subject: [PATCH] Attempt to fix locking mechanisms - Wraper to lock entire function was NOT working --- resources/lib/kodimonitor.py | 23 +- resources/lib/playback.py | 12 +- resources/lib/playlists.py | 50 +++-- resources/lib/playqueue.py | 19 +- resources/lib/plex_companion.py | 15 +- resources/lib/plexbmchelper/subscribers.py | 231 +++++++++++---------- resources/lib/state.py | 37 +--- 7 files changed, 183 insertions(+), 204 deletions(-) diff --git a/resources/lib/kodimonitor.py b/resources/lib/kodimonitor.py index e20302dd..2ee2cdb3 100644 --- a/resources/lib/kodimonitor.py +++ b/resources/lib/kodimonitor.py @@ -138,7 +138,8 @@ class KodiMonitor(xbmc.Monitor): if method == "Player.OnPlay": state.SUSPEND_SYNC = True - self.PlayBackStart(data) + with state.LOCK_PLAYQUEUES: + self.PlayBackStart(data) elif method == "Player.OnStop": # Should refresh our video nodes, e.g. on deck # xbmc.executebuiltin('ReloadSkin()') @@ -146,23 +147,28 @@ class KodiMonitor(xbmc.Monitor): self.hack_replay == data['item']): # Hack for add-on paths self.hack_replay = None - self._hack_addon_paths_replay_video() + with state.LOCK_PLAYQUEUES: + self._hack_addon_paths_replay_video() elif data.get('end'): if state.PKC_CAUSED_STOP is True: state.PKC_CAUSED_STOP = False LOG.debug('PKC caused this playback stop - ignoring') else: - _playback_cleanup(ended=True) + with state.LOCK_PLAYQUEUES: + _playback_cleanup(ended=True) else: - _playback_cleanup() + with state.LOCK_PLAYQUEUES: + _playback_cleanup() state.PKC_CAUSED_STOP_DONE = True state.SUSPEND_SYNC = False elif method == 'Playlist.OnAdd': - self._playlist_onadd(data) + with state.LOCK_PLAYQUEUES: + self._playlist_onadd(data) elif method == 'Playlist.OnRemove': self._playlist_onremove(data) elif method == 'Playlist.OnClear': - self._playlist_onclear(data) + with state.LOCK_PLAYQUEUES: + self._playlist_onclear(data) elif method == "VideoLibrary.OnUpdate": # Manually marking as watched/unwatched playcount = data.get('playcount') @@ -208,7 +214,6 @@ class KodiMonitor(xbmc.Monitor): state.STOP_PKC = True @staticmethod - @state.LOCKER_SUBSCRIBER.lockthis def _hack_addon_paths_replay_video(): """ Hack we need for RESUMABLE items because Kodi lost the path of the @@ -239,7 +244,6 @@ class KodiMonitor(xbmc.Monitor): thread = Thread(target=playback.playback_triage, kwargs=kwargs) thread.start() - @state.LOCKER_SUBSCRIBER.lockthis def _playlist_onadd(self, data): """ Called if an item is added to a Kodi playlist. Example data dict: @@ -272,7 +276,6 @@ class KodiMonitor(xbmc.Monitor): """ pass - @state.LOCKER_SUBSCRIBER.lockthis def _playlist_onclear(self, data): """ Called if a Kodi playlist is cleared. Example data dict: @@ -348,7 +351,6 @@ class KodiMonitor(xbmc.Monitor): json_item.get('type'), json_item.get('file')) - @state.LOCKER_SUBSCRIBER.lockthis def PlayBackStart(self, data): """ Called whenever playback is started. Example data: @@ -486,7 +488,6 @@ class SpecialMonitor(Thread): LOG.info("#====---- Special Monitor Stopped ----====#") -@state.LOCKER_SUBSCRIBER.lockthis def _playback_cleanup(ended=False): """ PKC cleanup after playback ends/is stopped. Pass ended=True if Kodi diff --git a/resources/lib/playback.py b/resources/lib/playback.py index 7be1f5d4..7b41ad4c 100644 --- a/resources/lib/playback.py +++ b/resources/lib/playback.py @@ -30,7 +30,6 @@ NULL_VIDEO = join(v.ADDON_FOLDER, 'addons', v.ADDON_ID, 'empty_video.mp4') ############################################################################### -@state.LOCKER_SUBSCRIBER.lockthis def playback_triage(plex_id=None, plex_type=None, path=None, resolve=True): """ Hit this function for addon path playback, Plex trailers, etc. @@ -107,11 +106,12 @@ def playback_triage(plex_id=None, plex_type=None, path=None, resolve=True): initiate = True else: initiate = False - if initiate: - _playback_init(plex_id, plex_type, playqueue, pos) - else: - # kick off playback on second pass - _conclude_playback(playqueue, pos) + with state.LOCK_PLAYQUEUES: + if initiate: + _playback_init(plex_id, plex_type, playqueue, pos) + else: + # kick off playback on second pass + _conclude_playback(playqueue, pos) def _playlist_playback(plex_id, plex_type): diff --git a/resources/lib/playlists.py b/resources/lib/playlists.py index f0c99ded..a340f575 100644 --- a/resources/lib/playlists.py +++ b/resources/lib/playlists.py @@ -267,41 +267,49 @@ def _kodi_playlist_identical(xml_element): pass -@state.LOCKER_PLAYLISTS.lockthis def process_websocket(plex_id, updated_at, state): """ Hit by librarysync to process websocket messages concerning playlists """ create = False playlist = playlist_object_from_db(plex_id=plex_id) - try: - if playlist and state == 9: - LOG.debug('Plex deletion of playlist detected: %s', playlist) - delete_kodi_playlist(playlist) - elif playlist and playlist.plex_updatedat == updated_at: - LOG.debug('Playlist with id %s already synced: %s', - plex_id, playlist) - elif playlist: - LOG.debug('Change of Plex playlist detected: %s', playlist) - delete_kodi_playlist(playlist) - create = True - elif not playlist and not state == 9: - LOG.debug('Creation of new Plex playlist detected: %s', plex_id) - create = True - # To the actual work - if create: - create_kodi_playlist(plex_id=plex_id, updated_at=updated_at) - except PL.PlaylistError: - pass + with state.LOCK_PLAYLISTS: + try: + if playlist and state == 9: + LOG.debug('Plex deletion of playlist detected: %s', playlist) + delete_kodi_playlist(playlist) + elif playlist and playlist.plex_updatedat == updated_at: + LOG.debug('Playlist with id %s already synced: %s', + plex_id, playlist) + elif playlist: + LOG.debug('Change of Plex playlist detected: %s', playlist) + delete_kodi_playlist(playlist) + create = True + elif not playlist and not state == 9: + LOG.debug('Creation of new Plex playlist detected: %s', + plex_id) + create = True + # To the actual work + if create: + create_kodi_playlist(plex_id=plex_id, updated_at=updated_at) + except PL.PlaylistError: + pass -@state.LOCKER_PLAYLISTS.lockthis def full_sync(): """ Full sync of playlists between Kodi and Plex. Returns True is successful, False otherwise """ LOG.info('Starting playlist full sync') + with state.LOCK_PLAYLISTS: + return _full_sync() + + +def _full_sync(): + """ + Need to lock because we're messing with playlists + """ # Get all Plex playlists xml = PL.get_all_playlists() if xml is None: diff --git a/resources/lib/playqueue.py b/resources/lib/playqueue.py index 232caf0a..8654a143 100644 --- a/resources/lib/playqueue.py +++ b/resources/lib/playqueue.py @@ -34,7 +34,7 @@ def init_playqueues(): LOG.debug('Playqueues have already been initialized') return # Initialize Kodi playqueues - with state.LOCK_SUBSCRIBER: + with state.LOCK_PLAYQUEUES: for i in (0, 1, 2): # Just in case the Kodi response is not sorted correctly for queue in js.get_playlists(): @@ -62,14 +62,13 @@ def get_playqueue_from_type(kodi_playlist_type): Returns the playqueue according to the kodi_playlist_type ('video', 'audio', 'picture') passed in """ - with state.LOCK_SUBSCRIBER: - for playqueue in PLAYQUEUES: - if playqueue.type == kodi_playlist_type: - break - else: - raise ValueError('Wrong playlist type passed in: %s', - kodi_playlist_type) - return playqueue + for playqueue in PLAYQUEUES: + if playqueue.type == kodi_playlist_type: + break + else: + raise ValueError('Wrong playlist type passed in: %s', + kodi_playlist_type) + return playqueue def init_playqueue_from_plex_children(plex_id, transient_token=None): @@ -190,7 +189,7 @@ class PlayqueueMonitor(Thread): if stopped(): break xbmc.sleep(1000) - with state.LOCK_SUBSCRIBER: + with state.LOCK_PLAYQUEUES: for playqueue in PLAYQUEUES: kodi_pl = js.playlist_get_items(playqueue.playlistid) if playqueue.old_kodi_pl != kodi_pl: diff --git a/resources/lib/plex_companion.py b/resources/lib/plex_companion.py index 49969759..c582ca68 100644 --- a/resources/lib/plex_companion.py +++ b/resources/lib/plex_companion.py @@ -43,7 +43,7 @@ def update_playqueue_from_PMS(playqueue, # Safe transient token from being deleted if transient_token is None: transient_token = playqueue.plex_transient_token - with state.LOCK_SUBSCRIBER: + with state.LOCK_PLAYQUEUES: xml = PL.get_PMS_playlist(playqueue, playqueue_id) playqueue.clear() try: @@ -74,7 +74,6 @@ class PlexCompanion(Thread): self.subscription_manager = None Thread.__init__(self) - @state.LOCKER_SUBSCRIBER.lockthis def _process_alexa(self, data): xml = PF.GetPlexMetadata(data['key']) try: @@ -131,7 +130,6 @@ class PlexCompanion(Thread): executebuiltin('RunPlugin(plugin://%s?%s)' % (v.ADDON_ID, urlencode(params))) - @state.LOCKER_SUBSCRIBER.lockthis def _process_playlist(self, data): # Get the playqueue ID _, container_key, query = PF.ParseContainerKey(data['containerKey']) @@ -156,7 +154,6 @@ class PlexCompanion(Thread): offset=data.get('offset'), transient_token=data.get('token')) - @state.LOCKER_SUBSCRIBER.lockthis def _process_streams(self, data): """ Plex Companion client adjusted audio or subtitle stream @@ -178,7 +175,6 @@ class PlexCompanion(Thread): else: LOG.error('Unknown setStreams command: %s', data) - @state.LOCKER_SUBSCRIBER.lockthis def _process_refresh(self, data): """ example data: {'playQueueID': '8475', 'commandID': '11'} @@ -217,14 +213,17 @@ class PlexCompanion(Thread): LOG.debug('Processing: %s', task) data = task['data'] if task['action'] == 'alexa': - self._process_alexa(data) + with state.LOCK_PLAYQUEUES: + self._process_alexa(data) elif (task['action'] == 'playlist' and data.get('address') == 'node.plexapp.com'): self._process_node(data) elif task['action'] == 'playlist': - self._process_playlist(data) + with state.LOCK_PLAYQUEUES: + self._process_playlist(data) elif task['action'] == 'refreshPlayQueue': - self._process_refresh(data) + with state.LOCK_PLAYQUEUES: + self._process_refresh(data) elif task['action'] == 'setStreams': try: self._process_streams(data) diff --git a/resources/lib/plexbmchelper/subscribers.py b/resources/lib/plexbmchelper/subscribers.py index 3dffcd1e..0965b02a 100644 --- a/resources/lib/plexbmchelper/subscribers.py +++ b/resources/lib/plexbmchelper/subscribers.py @@ -145,7 +145,6 @@ class SubscriptionMgr(object): position = info['position'] return position - @state.LOCKER_SUBSCRIBER.lockthis def msg(self, players): """ Returns a timeline xml as str @@ -185,94 +184,98 @@ class SubscriptionMgr(object): return answ def _timeline_dict(self, player, ptype): - playerid = player['playerid'] - info = state.PLAYER_STATES[playerid] - playqueue = PQ.PLAYQUEUES[playerid] - position = self._get_correct_position(info, playqueue) - try: - item = playqueue.items[position] - except IndexError: - # E.g. for direct path playback for single item - return { + with state.LOCK_PLAYQUEUES: + playerid = player['playerid'] + info = state.PLAYER_STATES[playerid] + playqueue = PQ.PLAYQUEUES[playerid] + position = self._get_correct_position(info, playqueue) + try: + item = playqueue.items[position] + except IndexError: + # E.g. for direct path playback for single item + return { + 'controllable': CONTROLLABLE[ptype], + 'type': ptype, + 'state': 'stopped' + } + if ptype in (v.PLEX_PLAYLIST_TYPE_VIDEO, + v.PLEX_PLAYLIST_TYPE_PHOTO): + self.location = 'fullScreenVideo' + self.stop_sent_to_web = False + pbmc_server = utils.window('pms_server') + if pbmc_server: + (self.protocol, self.server, self.port) = pbmc_server.split(':') + self.server = self.server.replace('/', '') + status = 'paused' if int(info['speed']) == 0 else 'playing' + duration = utils.kodi_time_to_millis(info['totaltime']) + shuffle = '1' if info['shuffled'] else '0' + mute = '1' if info['muted'] is True else '0' + answ = { 'controllable': CONTROLLABLE[ptype], + 'protocol': self.protocol, + 'address': self.server, + 'port': self.port, + 'machineIdentifier': utils.window('plex_machineIdentifier'), + 'state': status, 'type': ptype, - 'state': 'stopped' + 'itemType': ptype, + 'time': utils.kodi_time_to_millis(info['time']), + 'duration': duration, + 'seekRange': '0-%s' % duration, + 'shuffle': shuffle, + 'repeat': v.PLEX_REPEAT_FROM_KODI_REPEAT[info['repeat']], + 'volume': info['volume'], + 'mute': mute, + 'mediaIndex': 0, # Still to implement from here + 'partIndex': 0, + 'partCount': 1, + 'providerIdentifier': 'com.plexapp.plugins.library', } - if ptype in (v.PLEX_PLAYLIST_TYPE_VIDEO, v.PLEX_PLAYLIST_TYPE_PHOTO): - self.location = 'fullScreenVideo' - self.stop_sent_to_web = False - pbmc_server = utils.window('pms_server') - if pbmc_server: - (self.protocol, self.server, self.port) = pbmc_server.split(':') - self.server = self.server.replace('/', '') - status = 'paused' if int(info['speed']) == 0 else 'playing' - duration = utils.kodi_time_to_millis(info['totaltime']) - shuffle = '1' if info['shuffled'] else '0' - mute = '1' if info['muted'] is True else '0' - answ = { - 'controllable': CONTROLLABLE[ptype], - 'protocol': self.protocol, - 'address': self.server, - 'port': self.port, - 'machineIdentifier': utils.window('plex_machineIdentifier'), - 'state': status, - 'type': ptype, - 'itemType': ptype, - 'time': utils.kodi_time_to_millis(info['time']), - 'duration': duration, - 'seekRange': '0-%s' % duration, - 'shuffle': shuffle, - 'repeat': v.PLEX_REPEAT_FROM_KODI_REPEAT[info['repeat']], - 'volume': info['volume'], - 'mute': mute, - 'mediaIndex': 0, # Still to implement from here - 'partIndex': 0, - 'partCount': 1, - 'providerIdentifier': 'com.plexapp.plugins.library', - } - # Get the plex id from the PKC playqueue not info, as Kodi jumps to next - # playqueue element way BEFORE kodi monitor onplayback is called - if item.plex_id: - answ['key'] = '/library/metadata/%s' % item.plex_id - answ['ratingKey'] = item.plex_id - # PlayQueue stuff - if info['container_key']: - answ['containerKey'] = info['container_key'] - if (info['container_key'] is not None and - info['container_key'].startswith('/playQueues')): - answ['playQueueID'] = playqueue.id - answ['playQueueVersion'] = playqueue.version - answ['playQueueItemID'] = item.id - if playqueue.items[position].guid: - answ['guid'] = item.guid - # Temp. token set? - if state.PLEX_TRANSIENT_TOKEN: - answ['token'] = state.PLEX_TRANSIENT_TOKEN - elif playqueue.plex_transient_token: - answ['token'] = playqueue.plex_transient_token - # Process audio and subtitle streams - if ptype == v.PLEX_PLAYLIST_TYPE_VIDEO: - strm_id = self._plex_stream_index(playerid, 'audio') - if strm_id: - answ['audioStreamID'] = strm_id - else: - LOG.error('We could not select a Plex audiostream') - strm_id = self._plex_stream_index(playerid, 'video') - if strm_id: - answ['videoStreamID'] = strm_id - else: - LOG.error('We could not select a Plex videostream') - if info['subtitleenabled']: - try: - strm_id = self._plex_stream_index(playerid, 'subtitle') - except KeyError: - # subtitleenabled can be True while currentsubtitle can - # still be {} - strm_id = None - if strm_id is not None: - # If None, then the subtitle is only present on Kodi side - answ['subtitleStreamID'] = strm_id - return answ + # Get the plex id from the PKC playqueue not info, as Kodi jumps to + # next playqueue element way BEFORE kodi monitor onplayback is + # called + if item.plex_id: + answ['key'] = '/library/metadata/%s' % item.plex_id + answ['ratingKey'] = item.plex_id + # PlayQueue stuff + if info['container_key']: + answ['containerKey'] = info['container_key'] + if (info['container_key'] is not None and + info['container_key'].startswith('/playQueues')): + answ['playQueueID'] = playqueue.id + answ['playQueueVersion'] = playqueue.version + answ['playQueueItemID'] = item.id + if playqueue.items[position].guid: + answ['guid'] = item.guid + # Temp. token set? + if state.PLEX_TRANSIENT_TOKEN: + answ['token'] = state.PLEX_TRANSIENT_TOKEN + elif playqueue.plex_transient_token: + answ['token'] = playqueue.plex_transient_token + # Process audio and subtitle streams + if ptype == v.PLEX_PLAYLIST_TYPE_VIDEO: + strm_id = self._plex_stream_index(playerid, 'audio') + if strm_id: + answ['audioStreamID'] = strm_id + else: + LOG.error('We could not select a Plex audiostream') + strm_id = self._plex_stream_index(playerid, 'video') + if strm_id: + answ['videoStreamID'] = strm_id + else: + LOG.error('We could not select a Plex videostream') + if info['subtitleenabled']: + try: + strm_id = self._plex_stream_index(playerid, 'subtitle') + except KeyError: + # subtitleenabled can be True while currentsubtitle can + # still be {} + strm_id = None + if strm_id is not None: + # If None, then the subtitle is only present on Kodi + # side + answ['subtitleStreamID'] = strm_id + return answ def signal_stop(self): """ @@ -297,14 +300,14 @@ class SubscriptionMgr(object): return playqueue.items[position].plex_stream_index( info[STREAM_DETAILS[stream_type]]['index'], stream_type) - @state.LOCKER_SUBSCRIBER.lockthis def update_command_id(self, uuid, command_id): """ Updates the Plex Companien client with the machine identifier uuid with command_id """ - if command_id and self.subscribers.get(uuid): - self.subscribers[uuid].command_id = int(command_id) + with state.LOCK_SUBSCRIBER: + if command_id and self.subscribers.get(uuid): + self.subscribers[uuid].command_id = int(command_id) def _playqueue_init_done(self, players): """ @@ -327,29 +330,29 @@ class SubscriptionMgr(object): return False return True - @state.LOCKER_SUBSCRIBER.lockthis def notify(self): """ Causes PKC to tell the PMS and Plex Companion players to receive a notification what's being played. """ - self._cleanup() - # Get all the active/playing Kodi players (video, audio, pictures) - players = js.get_players() - # Update the PKC info with what's playing on the Kodi side - for player in players.values(): - update_player_info(player['playerid']) - # Check whether we can use the CURRENT info or whether PKC is still - # initializing - if self._playqueue_init_done(players) is False: - LOG.debug('PKC playqueue is still initializing - skipping update') - return - self._notify_server(players) - if self.subscribers: - msg = self.msg(players) - for subscriber in self.subscribers.values(): - subscriber.send_update(msg) - self.lastplayers = players + with state.LOCK_SUBSCRIBER: + self._cleanup() + # Get all the active/playing Kodi players (video, audio, pictures) + players = js.get_players() + # Update the PKC info with what's playing on the Kodi side + for player in players.values(): + update_player_info(player['playerid']) + # Check whether we can use the CURRENT info or whether PKC is still + # initializing + if self._playqueue_init_done(players) is False: + LOG.debug('PKC playqueue is still initializing - skip update') + return + self._notify_server(players) + if self.subscribers: + msg = self.msg(players) + for subscriber in self.subscribers.values(): + subscriber.send_update(msg) + self.lastplayers = players def _notify_server(self, players): for typus, player in players.iteritems(): @@ -410,7 +413,6 @@ class SubscriptionMgr(object): LOG.debug("Sent server notification with parameters: %s to %s", xargs, url) - @state.LOCKER_SUBSCRIBER.lockthis def add_subscriber(self, protocol, host, port, uuid, command_id): """ Adds a new Plex Companion subscriber to PKC. @@ -422,20 +424,21 @@ class SubscriptionMgr(object): command_id, self, self.request_mgr) - self.subscribers[subscriber.uuid] = subscriber + with state.LOCK_SUBSCRIBER: + self.subscribers[subscriber.uuid] = subscriber return subscriber - @state.LOCKER_SUBSCRIBER.lockthis def remove_subscriber(self, uuid): """ Removes a connected Plex Companion subscriber with machine identifier uuid from PKC notifications. (Calls the cleanup() method of the subscriber) """ - for subscriber in self.subscribers.values(): - if subscriber.uuid == uuid or subscriber.host == uuid: - subscriber.cleanup() - del self.subscribers[subscriber.uuid] + with state.LOCK_SUBSCRIBER: + for subscriber in self.subscribers.values(): + if subscriber.uuid == uuid or subscriber.host == uuid: + subscriber.cleanup() + del self.subscribers[subscriber.uuid] def _cleanup(self): for subscriber in self.subscribers.values(): diff --git a/resources/lib/state.py b/resources/lib/state.py index 0687825a..7cd67882 100644 --- a/resources/lib/state.py +++ b/resources/lib/state.py @@ -1,48 +1,17 @@ # -*- coding: utf-8 -*- # THREAD SAFE from threading import Lock, RLock -from functools import wraps - - -class LockFunction(object): - """ - Decorator for class methods and functions to lock them with lock. - - Initialize this class first - lockfunction = LockFunction(lock), where lock is a threading.Lock() object - - To then lock a function or method: - - @lockfunction.lockthis - def some_function(args, kwargs) - """ - def __init__(self, lock): - self.lock = lock - - def lockthis(self, func): - """ - Use this method to actually lock a function or method - """ - @wraps(func) - def wrapper(*args, **kwargs): - """ - Wrapper construct - """ - with self.lock: - result = func(*args, **kwargs) - return result - return wrapper # LOCKS #################### -# Need to lock all methods and functions messing with subscribers +# Need to lock all methods and functions messing with Plex Companion subscribers LOCK_SUBSCRIBER = RLock() -LOCKER_SUBSCRIBER = LockFunction(LOCK_SUBSCRIBER) +# Need to lock everything messing with Kodi/PKC playqueues +LOCK_PLAYQUEUES = RLock() # Necessary to temporarily hold back librarysync/websocket listener when doing # a full sync LOCK_PLAYLISTS = Lock() -LOCKER_PLAYLISTS = LockFunction(LOCK_PLAYLISTS) # Quit PKC STOP_PKC = False