Attempt to fix locking mechanisms

- Wraper to lock entire function was NOT working
This commit is contained in:
Croneter 2018-06-21 20:43:39 +02:00
parent c440dc7779
commit 9b76795ea4
7 changed files with 183 additions and 204 deletions

View file

@ -138,6 +138,7 @@ class KodiMonitor(xbmc.Monitor):
if method == "Player.OnPlay": if method == "Player.OnPlay":
state.SUSPEND_SYNC = True state.SUSPEND_SYNC = True
with state.LOCK_PLAYQUEUES:
self.PlayBackStart(data) self.PlayBackStart(data)
elif method == "Player.OnStop": elif method == "Player.OnStop":
# Should refresh our video nodes, e.g. on deck # Should refresh our video nodes, e.g. on deck
@ -146,22 +147,27 @@ class KodiMonitor(xbmc.Monitor):
self.hack_replay == data['item']): self.hack_replay == data['item']):
# Hack for add-on paths # Hack for add-on paths
self.hack_replay = None self.hack_replay = None
with state.LOCK_PLAYQUEUES:
self._hack_addon_paths_replay_video() self._hack_addon_paths_replay_video()
elif data.get('end'): elif data.get('end'):
if state.PKC_CAUSED_STOP is True: if state.PKC_CAUSED_STOP is True:
state.PKC_CAUSED_STOP = False state.PKC_CAUSED_STOP = False
LOG.debug('PKC caused this playback stop - ignoring') LOG.debug('PKC caused this playback stop - ignoring')
else: else:
with state.LOCK_PLAYQUEUES:
_playback_cleanup(ended=True) _playback_cleanup(ended=True)
else: else:
with state.LOCK_PLAYQUEUES:
_playback_cleanup() _playback_cleanup()
state.PKC_CAUSED_STOP_DONE = True state.PKC_CAUSED_STOP_DONE = True
state.SUSPEND_SYNC = False state.SUSPEND_SYNC = False
elif method == 'Playlist.OnAdd': elif method == 'Playlist.OnAdd':
with state.LOCK_PLAYQUEUES:
self._playlist_onadd(data) self._playlist_onadd(data)
elif method == 'Playlist.OnRemove': elif method == 'Playlist.OnRemove':
self._playlist_onremove(data) self._playlist_onremove(data)
elif method == 'Playlist.OnClear': elif method == 'Playlist.OnClear':
with state.LOCK_PLAYQUEUES:
self._playlist_onclear(data) self._playlist_onclear(data)
elif method == "VideoLibrary.OnUpdate": elif method == "VideoLibrary.OnUpdate":
# Manually marking as watched/unwatched # Manually marking as watched/unwatched
@ -208,7 +214,6 @@ class KodiMonitor(xbmc.Monitor):
state.STOP_PKC = True state.STOP_PKC = True
@staticmethod @staticmethod
@state.LOCKER_SUBSCRIBER.lockthis
def _hack_addon_paths_replay_video(): def _hack_addon_paths_replay_video():
""" """
Hack we need for RESUMABLE items because Kodi lost the path of the Hack we need for RESUMABLE items because Kodi lost the path of the
@ -239,7 +244,6 @@ class KodiMonitor(xbmc.Monitor):
thread = Thread(target=playback.playback_triage, kwargs=kwargs) thread = Thread(target=playback.playback_triage, kwargs=kwargs)
thread.start() thread.start()
@state.LOCKER_SUBSCRIBER.lockthis
def _playlist_onadd(self, data): def _playlist_onadd(self, data):
""" """
Called if an item is added to a Kodi playlist. Example data dict: Called if an item is added to a Kodi playlist. Example data dict:
@ -272,7 +276,6 @@ class KodiMonitor(xbmc.Monitor):
""" """
pass pass
@state.LOCKER_SUBSCRIBER.lockthis
def _playlist_onclear(self, data): def _playlist_onclear(self, data):
""" """
Called if a Kodi playlist is cleared. Example data dict: Called if a Kodi playlist is cleared. Example data dict:
@ -348,7 +351,6 @@ class KodiMonitor(xbmc.Monitor):
json_item.get('type'), json_item.get('type'),
json_item.get('file')) json_item.get('file'))
@state.LOCKER_SUBSCRIBER.lockthis
def PlayBackStart(self, data): def PlayBackStart(self, data):
""" """
Called whenever playback is started. Example data: Called whenever playback is started. Example data:
@ -486,7 +488,6 @@ class SpecialMonitor(Thread):
LOG.info("#====---- Special Monitor Stopped ----====#") LOG.info("#====---- Special Monitor Stopped ----====#")
@state.LOCKER_SUBSCRIBER.lockthis
def _playback_cleanup(ended=False): def _playback_cleanup(ended=False):
""" """
PKC cleanup after playback ends/is stopped. Pass ended=True if Kodi PKC cleanup after playback ends/is stopped. Pass ended=True if Kodi

View file

@ -30,7 +30,6 @@ NULL_VIDEO = join(v.ADDON_FOLDER, 'addons', v.ADDON_ID, 'empty_video.mp4')
############################################################################### ###############################################################################
@state.LOCKER_SUBSCRIBER.lockthis
def playback_triage(plex_id=None, plex_type=None, path=None, resolve=True): def playback_triage(plex_id=None, plex_type=None, path=None, resolve=True):
""" """
Hit this function for addon path playback, Plex trailers, etc. Hit this function for addon path playback, Plex trailers, etc.
@ -107,6 +106,7 @@ def playback_triage(plex_id=None, plex_type=None, path=None, resolve=True):
initiate = True initiate = True
else: else:
initiate = False initiate = False
with state.LOCK_PLAYQUEUES:
if initiate: if initiate:
_playback_init(plex_id, plex_type, playqueue, pos) _playback_init(plex_id, plex_type, playqueue, pos)
else: else:

View file

@ -267,13 +267,13 @@ def _kodi_playlist_identical(xml_element):
pass pass
@state.LOCKER_PLAYLISTS.lockthis
def process_websocket(plex_id, updated_at, state): def process_websocket(plex_id, updated_at, state):
""" """
Hit by librarysync to process websocket messages concerning playlists Hit by librarysync to process websocket messages concerning playlists
""" """
create = False create = False
playlist = playlist_object_from_db(plex_id=plex_id) playlist = playlist_object_from_db(plex_id=plex_id)
with state.LOCK_PLAYLISTS:
try: try:
if playlist and state == 9: if playlist and state == 9:
LOG.debug('Plex deletion of playlist detected: %s', playlist) LOG.debug('Plex deletion of playlist detected: %s', playlist)
@ -286,7 +286,8 @@ def process_websocket(plex_id, updated_at, state):
delete_kodi_playlist(playlist) delete_kodi_playlist(playlist)
create = True create = True
elif not playlist and not state == 9: elif not playlist and not state == 9:
LOG.debug('Creation of new Plex playlist detected: %s', plex_id) LOG.debug('Creation of new Plex playlist detected: %s',
plex_id)
create = True create = True
# To the actual work # To the actual work
if create: if create:
@ -295,13 +296,20 @@ def process_websocket(plex_id, updated_at, state):
pass pass
@state.LOCKER_PLAYLISTS.lockthis
def full_sync(): def full_sync():
""" """
Full sync of playlists between Kodi and Plex. Returns True is successful, Full sync of playlists between Kodi and Plex. Returns True is successful,
False otherwise False otherwise
""" """
LOG.info('Starting playlist full sync') LOG.info('Starting playlist full sync')
with state.LOCK_PLAYLISTS:
return _full_sync()
def _full_sync():
"""
Need to lock because we're messing with playlists
"""
# Get all Plex playlists # Get all Plex playlists
xml = PL.get_all_playlists() xml = PL.get_all_playlists()
if xml is None: if xml is None:

View file

@ -34,7 +34,7 @@ def init_playqueues():
LOG.debug('Playqueues have already been initialized') LOG.debug('Playqueues have already been initialized')
return return
# Initialize Kodi playqueues # Initialize Kodi playqueues
with state.LOCK_SUBSCRIBER: with state.LOCK_PLAYQUEUES:
for i in (0, 1, 2): for i in (0, 1, 2):
# Just in case the Kodi response is not sorted correctly # Just in case the Kodi response is not sorted correctly
for queue in js.get_playlists(): for queue in js.get_playlists():
@ -62,7 +62,6 @@ def get_playqueue_from_type(kodi_playlist_type):
Returns the playqueue according to the kodi_playlist_type ('video', Returns the playqueue according to the kodi_playlist_type ('video',
'audio', 'picture') passed in 'audio', 'picture') passed in
""" """
with state.LOCK_SUBSCRIBER:
for playqueue in PLAYQUEUES: for playqueue in PLAYQUEUES:
if playqueue.type == kodi_playlist_type: if playqueue.type == kodi_playlist_type:
break break
@ -190,7 +189,7 @@ class PlayqueueMonitor(Thread):
if stopped(): if stopped():
break break
xbmc.sleep(1000) xbmc.sleep(1000)
with state.LOCK_SUBSCRIBER: with state.LOCK_PLAYQUEUES:
for playqueue in PLAYQUEUES: for playqueue in PLAYQUEUES:
kodi_pl = js.playlist_get_items(playqueue.playlistid) kodi_pl = js.playlist_get_items(playqueue.playlistid)
if playqueue.old_kodi_pl != kodi_pl: if playqueue.old_kodi_pl != kodi_pl:

View file

@ -43,7 +43,7 @@ def update_playqueue_from_PMS(playqueue,
# Safe transient token from being deleted # Safe transient token from being deleted
if transient_token is None: if transient_token is None:
transient_token = playqueue.plex_transient_token transient_token = playqueue.plex_transient_token
with state.LOCK_SUBSCRIBER: with state.LOCK_PLAYQUEUES:
xml = PL.get_PMS_playlist(playqueue, playqueue_id) xml = PL.get_PMS_playlist(playqueue, playqueue_id)
playqueue.clear() playqueue.clear()
try: try:
@ -74,7 +74,6 @@ class PlexCompanion(Thread):
self.subscription_manager = None self.subscription_manager = None
Thread.__init__(self) Thread.__init__(self)
@state.LOCKER_SUBSCRIBER.lockthis
def _process_alexa(self, data): def _process_alexa(self, data):
xml = PF.GetPlexMetadata(data['key']) xml = PF.GetPlexMetadata(data['key'])
try: try:
@ -131,7 +130,6 @@ class PlexCompanion(Thread):
executebuiltin('RunPlugin(plugin://%s?%s)' executebuiltin('RunPlugin(plugin://%s?%s)'
% (v.ADDON_ID, urlencode(params))) % (v.ADDON_ID, urlencode(params)))
@state.LOCKER_SUBSCRIBER.lockthis
def _process_playlist(self, data): def _process_playlist(self, data):
# Get the playqueue ID # Get the playqueue ID
_, container_key, query = PF.ParseContainerKey(data['containerKey']) _, container_key, query = PF.ParseContainerKey(data['containerKey'])
@ -156,7 +154,6 @@ class PlexCompanion(Thread):
offset=data.get('offset'), offset=data.get('offset'),
transient_token=data.get('token')) transient_token=data.get('token'))
@state.LOCKER_SUBSCRIBER.lockthis
def _process_streams(self, data): def _process_streams(self, data):
""" """
Plex Companion client adjusted audio or subtitle stream Plex Companion client adjusted audio or subtitle stream
@ -178,7 +175,6 @@ class PlexCompanion(Thread):
else: else:
LOG.error('Unknown setStreams command: %s', data) LOG.error('Unknown setStreams command: %s', data)
@state.LOCKER_SUBSCRIBER.lockthis
def _process_refresh(self, data): def _process_refresh(self, data):
""" """
example data: {'playQueueID': '8475', 'commandID': '11'} example data: {'playQueueID': '8475', 'commandID': '11'}
@ -217,13 +213,16 @@ class PlexCompanion(Thread):
LOG.debug('Processing: %s', task) LOG.debug('Processing: %s', task)
data = task['data'] data = task['data']
if task['action'] == 'alexa': if task['action'] == 'alexa':
with state.LOCK_PLAYQUEUES:
self._process_alexa(data) self._process_alexa(data)
elif (task['action'] == 'playlist' and elif (task['action'] == 'playlist' and
data.get('address') == 'node.plexapp.com'): data.get('address') == 'node.plexapp.com'):
self._process_node(data) self._process_node(data)
elif task['action'] == 'playlist': elif task['action'] == 'playlist':
with state.LOCK_PLAYQUEUES:
self._process_playlist(data) self._process_playlist(data)
elif task['action'] == 'refreshPlayQueue': elif task['action'] == 'refreshPlayQueue':
with state.LOCK_PLAYQUEUES:
self._process_refresh(data) self._process_refresh(data)
elif task['action'] == 'setStreams': elif task['action'] == 'setStreams':
try: try:

View file

@ -145,7 +145,6 @@ class SubscriptionMgr(object):
position = info['position'] position = info['position']
return position return position
@state.LOCKER_SUBSCRIBER.lockthis
def msg(self, players): def msg(self, players):
""" """
Returns a timeline xml as str Returns a timeline xml as str
@ -185,6 +184,7 @@ class SubscriptionMgr(object):
return answ return answ
def _timeline_dict(self, player, ptype): def _timeline_dict(self, player, ptype):
with state.LOCK_PLAYQUEUES:
playerid = player['playerid'] playerid = player['playerid']
info = state.PLAYER_STATES[playerid] info = state.PLAYER_STATES[playerid]
playqueue = PQ.PLAYQUEUES[playerid] playqueue = PQ.PLAYQUEUES[playerid]
@ -198,7 +198,8 @@ class SubscriptionMgr(object):
'type': ptype, 'type': ptype,
'state': 'stopped' 'state': 'stopped'
} }
if ptype in (v.PLEX_PLAYLIST_TYPE_VIDEO, v.PLEX_PLAYLIST_TYPE_PHOTO): if ptype in (v.PLEX_PLAYLIST_TYPE_VIDEO,
v.PLEX_PLAYLIST_TYPE_PHOTO):
self.location = 'fullScreenVideo' self.location = 'fullScreenVideo'
self.stop_sent_to_web = False self.stop_sent_to_web = False
pbmc_server = utils.window('pms_server') pbmc_server = utils.window('pms_server')
@ -230,8 +231,9 @@ class SubscriptionMgr(object):
'partCount': 1, 'partCount': 1,
'providerIdentifier': 'com.plexapp.plugins.library', 'providerIdentifier': 'com.plexapp.plugins.library',
} }
# Get the plex id from the PKC playqueue not info, as Kodi jumps to next # Get the plex id from the PKC playqueue not info, as Kodi jumps to
# playqueue element way BEFORE kodi monitor onplayback is called # next playqueue element way BEFORE kodi monitor onplayback is
# called
if item.plex_id: if item.plex_id:
answ['key'] = '/library/metadata/%s' % item.plex_id answ['key'] = '/library/metadata/%s' % item.plex_id
answ['ratingKey'] = item.plex_id answ['ratingKey'] = item.plex_id
@ -270,7 +272,8 @@ class SubscriptionMgr(object):
# still be {} # still be {}
strm_id = None strm_id = None
if strm_id is not None: if strm_id is not None:
# If None, then the subtitle is only present on Kodi side # If None, then the subtitle is only present on Kodi
# side
answ['subtitleStreamID'] = strm_id answ['subtitleStreamID'] = strm_id
return answ return answ
@ -297,12 +300,12 @@ class SubscriptionMgr(object):
return playqueue.items[position].plex_stream_index( return playqueue.items[position].plex_stream_index(
info[STREAM_DETAILS[stream_type]]['index'], stream_type) info[STREAM_DETAILS[stream_type]]['index'], stream_type)
@state.LOCKER_SUBSCRIBER.lockthis
def update_command_id(self, uuid, command_id): def update_command_id(self, uuid, command_id):
""" """
Updates the Plex Companien client with the machine identifier uuid with Updates the Plex Companien client with the machine identifier uuid with
command_id command_id
""" """
with state.LOCK_SUBSCRIBER:
if command_id and self.subscribers.get(uuid): if command_id and self.subscribers.get(uuid):
self.subscribers[uuid].command_id = int(command_id) self.subscribers[uuid].command_id = int(command_id)
@ -327,12 +330,12 @@ class SubscriptionMgr(object):
return False return False
return True return True
@state.LOCKER_SUBSCRIBER.lockthis
def notify(self): def notify(self):
""" """
Causes PKC to tell the PMS and Plex Companion players to receive a Causes PKC to tell the PMS and Plex Companion players to receive a
notification what's being played. notification what's being played.
""" """
with state.LOCK_SUBSCRIBER:
self._cleanup() self._cleanup()
# Get all the active/playing Kodi players (video, audio, pictures) # Get all the active/playing Kodi players (video, audio, pictures)
players = js.get_players() players = js.get_players()
@ -342,7 +345,7 @@ class SubscriptionMgr(object):
# Check whether we can use the CURRENT info or whether PKC is still # Check whether we can use the CURRENT info or whether PKC is still
# initializing # initializing
if self._playqueue_init_done(players) is False: if self._playqueue_init_done(players) is False:
LOG.debug('PKC playqueue is still initializing - skipping update') LOG.debug('PKC playqueue is still initializing - skip update')
return return
self._notify_server(players) self._notify_server(players)
if self.subscribers: if self.subscribers:
@ -410,7 +413,6 @@ class SubscriptionMgr(object):
LOG.debug("Sent server notification with parameters: %s to %s", LOG.debug("Sent server notification with parameters: %s to %s",
xargs, url) xargs, url)
@state.LOCKER_SUBSCRIBER.lockthis
def add_subscriber(self, protocol, host, port, uuid, command_id): def add_subscriber(self, protocol, host, port, uuid, command_id):
""" """
Adds a new Plex Companion subscriber to PKC. Adds a new Plex Companion subscriber to PKC.
@ -422,16 +424,17 @@ class SubscriptionMgr(object):
command_id, command_id,
self, self,
self.request_mgr) self.request_mgr)
with state.LOCK_SUBSCRIBER:
self.subscribers[subscriber.uuid] = subscriber self.subscribers[subscriber.uuid] = subscriber
return subscriber return subscriber
@state.LOCKER_SUBSCRIBER.lockthis
def remove_subscriber(self, uuid): def remove_subscriber(self, uuid):
""" """
Removes a connected Plex Companion subscriber with machine identifier Removes a connected Plex Companion subscriber with machine identifier
uuid from PKC notifications. uuid from PKC notifications.
(Calls the cleanup() method of the subscriber) (Calls the cleanup() method of the subscriber)
""" """
with state.LOCK_SUBSCRIBER:
for subscriber in self.subscribers.values(): for subscriber in self.subscribers.values():
if subscriber.uuid == uuid or subscriber.host == uuid: if subscriber.uuid == uuid or subscriber.host == uuid:
subscriber.cleanup() subscriber.cleanup()

View file

@ -1,48 +1,17 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# THREAD SAFE # THREAD SAFE
from threading import Lock, RLock from threading import Lock, RLock
from functools import wraps
class LockFunction(object):
"""
Decorator for class methods and functions to lock them with lock.
Initialize this class first
lockfunction = LockFunction(lock), where lock is a threading.Lock() object
To then lock a function or method:
@lockfunction.lockthis
def some_function(args, kwargs)
"""
def __init__(self, lock):
self.lock = lock
def lockthis(self, func):
"""
Use this method to actually lock a function or method
"""
@wraps(func)
def wrapper(*args, **kwargs):
"""
Wrapper construct
"""
with self.lock:
result = func(*args, **kwargs)
return result
return wrapper
# LOCKS # LOCKS
#################### ####################
# Need to lock all methods and functions messing with subscribers # Need to lock all methods and functions messing with Plex Companion subscribers
LOCK_SUBSCRIBER = RLock() LOCK_SUBSCRIBER = RLock()
LOCKER_SUBSCRIBER = LockFunction(LOCK_SUBSCRIBER) # Need to lock everything messing with Kodi/PKC playqueues
LOCK_PLAYQUEUES = RLock()
# Necessary to temporarily hold back librarysync/websocket listener when doing # Necessary to temporarily hold back librarysync/websocket listener when doing
# a full sync # a full sync
LOCK_PLAYLISTS = Lock() LOCK_PLAYLISTS = Lock()
LOCKER_PLAYLISTS = LockFunction(LOCK_PLAYLISTS)
# Quit PKC # Quit PKC
STOP_PKC = False STOP_PKC = False