Rewire suspension of waking up of websocket threads

This commit is contained in:
croneter 2020-05-07 07:26:03 +02:00
parent 92a28b6eda
commit dccd3e512b
6 changed files with 157 additions and 56 deletions

View file

@ -5,12 +5,19 @@ Used to save PKC's application state and share between modules. Be careful
if you invoke another PKC Python instance (!!) when e.g. PKC.movies is called if you invoke another PKC Python instance (!!) when e.g. PKC.movies is called
""" """
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from copy import deepcopy
from logging import getLogger
import xbmc
from .account import Account from .account import Account
from .application import App from .application import App
from .connection import Connection from .connection import Connection
from .libsync import Sync from .libsync import Sync
from .playstate import PlayState from .playstate import PlayState
LOG = getLogger('PLEX.app')
ACCOUNT = None ACCOUNT = None
APP = None APP = None
CONN = None CONN = None
@ -30,3 +37,56 @@ def init(entrypoint=False):
SYNC = Sync(entrypoint) SYNC = Sync(entrypoint)
if not entrypoint: if not entrypoint:
PLAYSTATE = PlayState() PLAYSTATE = PlayState()
def _check_thread_suspension():
global ACCOUNT, APP, SYNC
threads_to_be_suspended = set()
if SYNC.background_sync_disabled:
threads_to_be_suspended.add(APP.pms_websocket)
if not SYNC.enable_alexa or not ACCOUNT.plex_token:
threads_to_be_suspended.add(APP.alexa_websocket)
if ACCOUNT.restricted_user:
threads_to_be_suspended.add(APP.pms_websocket)
threads_to_be_suspended.add(APP.alexa_websocket)
if None in threads_to_be_suspended:
threads_to_be_suspended.remove(None)
return threads_to_be_suspended
def resume_threads():
"""
Resume all thread activity with or without blocking. Won't resume websocket
threads if they should not be resumed
Returns True only if PKC shutdown requested
"""
global APP
threads = deepcopy(APP.threads)
threads_to_be_suspended = _check_thread_suspension()
LOG.debug('Not resuming the following threads: %s', threads_to_be_suspended)
for thread in threads_to_be_suspended:
try:
threads.remove(thread)
except ValueError:
pass
LOG.debug('Thus resuming the following threads: %s', threads)
for thread in threads:
thread.resume()
return xbmc.Monitor().abortRequested()
def check_websocket_threads_suspend():
threads_to_be_suspended = _check_thread_suspension()
for thread in threads_to_be_suspended:
thread.suspend()
def suspend_threads(block=True):
global APP
APP.suspend_threads(block=block)
def reload():
global APP, SYNC
APP.reload()
SYNC.reload()

View file

@ -22,7 +22,7 @@ class App(object):
if entrypoint: if entrypoint:
self.load_entrypoint() self.load_entrypoint()
else: else:
self.load() self.reload()
# Quit PKC? # Quit PKC?
self.stop_pkc = False self.stop_pkc = False
# This will suspend the main thread also # This will suspend the main thread also
@ -51,6 +51,8 @@ class App(object):
self.fanart_thread = None self.fanart_thread = None
# Instance of ImageCachingThread() # Instance of ImageCachingThread()
self.caching_thread = None self.caching_thread = None
self.pms_websocket = None
self.alexa_websocket = None
@property @property
def is_playing(self): def is_playing(self):
@ -102,6 +104,48 @@ class App(object):
except AttributeError: except AttributeError:
pass pass
def register_pms_websocket(self, thread):
self.pms_websocket = thread
self.threads.append(thread)
def deregister_pms_websocket(self, thread):
self.pms_websocket.unblock_callers()
self.pms_websocket = None
self.threads.remove(thread)
def suspend_pms_websocket(self, block=True):
try:
self.pms_websocket.suspend(block=block)
except AttributeError:
pass
def resume_pms_websocket(self):
try:
self.pms_websocket.resume()
except AttributeError:
pass
def register_alexa_websocket(self, thread):
self.alexa_websocket = thread
self.threads.append(thread)
def deregister_alexa_websocket(self, thread):
self.alexa_websocket.unblock_callers()
self.alexa_websocket = None
self.threads.remove(thread)
def suspend_alexa_websocket(self, block=True):
try:
self.alexa_websocket.suspend(block=block)
except AttributeError:
pass
def resume_alexa_websocket(self):
try:
self.alexa_websocket.resume()
except AttributeError:
pass
def register_thread(self, thread): def register_thread(self, thread):
""" """
Hit with thread [backgroundthread.Killablethread instance] to register Hit with thread [backgroundthread.Killablethread instance] to register
@ -136,16 +180,6 @@ class App(object):
break break
return xbmc.Monitor().abortRequested() return xbmc.Monitor().abortRequested()
def resume_threads(self):
"""
Resume all thread activity with or without blocking.
Returns True only if PKC shutdown requested
"""
LOG.debug('Resuming threads: %s', self.threads)
for thread in self.threads:
thread.resume()
return xbmc.Monitor().abortRequested()
def stop_threads(self, block=True): def stop_threads(self, block=True):
""" """
Stop all threads. Will block until all threads are stopped Stop all threads. Will block until all threads are stopped
@ -160,7 +194,7 @@ class App(object):
if xbmc.sleep(100): if xbmc.sleep(100):
return True return True
def load(self): def reload(self):
# Number of items to fetch and display in widgets # Number of items to fetch and display in widgets
self.fetch_pms_item_number = int(utils.settings('fetch_pms_item_number')) self.fetch_pms_item_number = int(utils.settings('fetch_pms_item_number'))
# Hack to force Kodi widget for "in progress" to show up if it was empty # Hack to force Kodi widget for "in progress" to show up if it was empty

View file

@ -81,6 +81,8 @@ class Sync(object):
# re-built if sections are set a-new # re-built if sections are set a-new
self.section_ids = set() self.section_ids = set()
self.enable_alexa = None
self.load() self.load()
@property @property
@ -113,11 +115,19 @@ class Sync(object):
self.show_extras_instead_of_playing_trailer = utils.settings('showExtrasInsteadOfTrailer') == 'true' self.show_extras_instead_of_playing_trailer = utils.settings('showExtrasInsteadOfTrailer') == 'true'
self.sync_specific_plex_playlists = utils.settings('syncSpecificPlexPlaylists') == 'true' self.sync_specific_plex_playlists = utils.settings('syncSpecificPlexPlaylists') == 'true'
self.sync_specific_kodi_playlists = utils.settings('syncSpecificKodiPlaylists') == 'true' self.sync_specific_kodi_playlists = utils.settings('syncSpecificKodiPlaylists') == 'true'
self.sync_thread_number = int(utils.settings('syncThreadNumber'))
self.reload()
def reload(self):
"""
Any settings unrelated to syncs to the Kodi database - can thus be
safely reset without a Kodi reboot
"""
self.sync_dialog = utils.settings('dbSyncIndicator') == 'true' self.sync_dialog = utils.settings('dbSyncIndicator') == 'true'
self.full_sync_intervall = int(utils.settings('fullSyncInterval')) * 60 self.full_sync_intervall = int(utils.settings('fullSyncInterval')) * 60
self.background_sync_disabled = utils.settings('enableBackgroundSync') == 'false' self.background_sync_disabled = utils.settings('enableBackgroundSync') == 'false'
self.backgroundsync_saftymargin = int(utils.settings('backgroundsync_saftyMargin')) self.backgroundsync_saftymargin = int(utils.settings('backgroundsync_saftyMargin'))
self.sync_thread_number = int(utils.settings('syncThreadNumber'))
self.image_sync_notifications = utils.settings('imageSyncNotifications') == 'true' self.image_sync_notifications = utils.settings('imageSyncNotifications') == 'true'
self.enable_alexa = utils.settings('enable_alexa') == 'true'

View file

@ -116,7 +116,7 @@ class Service(object):
# PMS was online before # PMS was online before
LOG.warn("Plex Media Server went offline") LOG.warn("Plex Media Server went offline")
app.CONN.online = False app.CONN.online = False
app.APP.suspend_threads() app.suspend_threads()
LOG.debug('Threads suspended') LOG.debug('Threads suspended')
if utils.settings('show_pms_offline') == 'true': if utils.settings('show_pms_offline') == 'true':
utils.dialog('notification', utils.dialog('notification',
@ -154,7 +154,7 @@ class Service(object):
if app.ACCOUNT.authenticated: if app.ACCOUNT.authenticated:
# Server got offline when we were authenticated. # Server got offline when we were authenticated.
# Hence resume threads # Hence resume threads
app.APP.resume_threads() app.resume_threads()
app.CONN.online = True app.CONN.online = True
finally: finally:
self.connection_check_running = False self.connection_check_running = False
@ -165,7 +165,7 @@ class Service(object):
Ensures that lib sync threads are suspended; signs out user Ensures that lib sync threads are suspended; signs out user
""" """
LOG.info('Log-out requested') LOG.info('Log-out requested')
app.APP.suspend_threads() app.suspend_threads()
LOG.info('Successfully suspended threads') LOG.info('Successfully suspended threads')
app.ACCOUNT.log_out() app.ACCOUNT.log_out()
LOG.info('User has been logged out') LOG.info('User has been logged out')
@ -248,7 +248,10 @@ class Service(object):
icon='{plex}', icon='{plex}',
time=2000, time=2000,
sound=False) sound=False)
app.APP.resume_threads() app.reload()
app.check_websocket_threads_suspend()
app.resume_threads()
self.auth_running = False self.auth_running = False
def enter_new_pms_address(self): def enter_new_pms_address(self):
@ -290,7 +293,7 @@ class Service(object):
# "Unauthorized for PMS" # "Unauthorized for PMS"
utils.dialog('notification', utils.lang(29999), utils.lang(30017)) utils.dialog('notification', utils.lang(29999), utils.lang(30017))
return return
app.APP.suspend_threads() app.suspend_threads()
from .library_sync import sections from .library_sync import sections
try: try:
# Get newest sections from the PMS # Get newest sections from the PMS
@ -300,14 +303,14 @@ class Service(object):
library_sync.force_full_sync() library_sync.force_full_sync()
app.SYNC.run_lib_scan = 'full' app.SYNC.run_lib_scan = 'full'
finally: finally:
app.APP.resume_threads() app.resume_threads()
def reset_playlists_and_nodes(self): def reset_playlists_and_nodes(self):
""" """
Resets the Kodi playlists and nodes for all the PKC libraries by Resets the Kodi playlists and nodes for all the PKC libraries by
deleting all of them first, then rewriting everything deleting all of them first, then rewriting everything
""" """
app.APP.suspend_threads() app.suspend_threads()
from .library_sync import sections from .library_sync import sections
try: try:
sections.clear_window_vars() sections.clear_window_vars()
@ -329,7 +332,7 @@ class Service(object):
icon='{plex}', icon='{plex}',
sound=False) sound=False)
finally: finally:
app.APP.resume_threads() app.resume_threads()
xbmc.executebuiltin('ReloadSkin()') xbmc.executebuiltin('ReloadSkin()')
def _do_auth(self): def _do_auth(self):
@ -362,7 +365,7 @@ class Service(object):
if not user: if not user:
LOG.info('No user received') LOG.info('No user received')
app.APP.suspend = True app.APP.suspend = True
app.APP.suspend_threads() app.suspend_threads()
LOG.debug('Threads suspended') LOG.debug('Threads suspended')
return False return False
username = user.title username = user.title
@ -398,7 +401,7 @@ class Service(object):
else: else:
LOG.debug('Suspending threads') LOG.debug('Suspending threads')
app.APP.suspend = True app.APP.suspend = True
app.APP.suspend_threads() app.suspend_threads()
LOG.debug('Threads suspended') LOG.debug('Threads suspended')
return False return False
elif res >= 400: elif res >= 400:
@ -535,7 +538,6 @@ class Service(object):
self.sync.start() self.sync.start()
self.plexcompanion.start() self.plexcompanion.start()
self.playqueue.start() self.playqueue.start()
if utils.settings('enable_alexa') == 'true':
self.alexa.start() self.alexa.start()
xbmc.sleep(100) xbmc.sleep(100)

View file

@ -603,7 +603,7 @@ def reset(ask_user=True):
return return
from . import app from . import app
# first stop any db sync # first stop any db sync
app.APP.suspend_threads() app.suspend_threads()
# Reset all PlexKodiConnect Addon settings? (this is usually NOT # Reset all PlexKodiConnect Addon settings? (this is usually NOT
# recommended and unnecessary!) # recommended and unnecessary!)
if ask_user and yesno_dialog(lang(29999), lang(39603)): if ask_user and yesno_dialog(lang(29999), lang(39603)):

View file

@ -22,6 +22,11 @@ class WebSocket(backgroundthread.KillableThread):
self.sleeptime = 0.0 self.sleeptime = 0.0
super(WebSocket, self).__init__() super(WebSocket, self).__init__()
def close_websocket(self):
if self.ws is not None:
self.ws.close()
self.ws = None
def process(self, opcode, message): def process(self, opcode, message):
raise NotImplementedError raise NotImplementedError
@ -56,26 +61,11 @@ class WebSocket(backgroundthread.KillableThread):
if self.sleeptime < 6: if self.sleeptime < 6:
self.sleeptime += 1.0 self.sleeptime += 1.0
def run(self):
LOG.info("----===## Starting %s ##===----", self.__class__.__name__)
app.APP.register_thread(self)
try:
self._run()
finally:
# Close websocket connection on shutdown
if self.ws is not None:
self.ws.close()
app.APP.deregister_thread(self)
LOG.info("##===---- %s Stopped ----===##", self.__class__.__name__)
def _run(self): def _run(self):
while not self.should_cancel(): while not self.should_cancel():
# In the event the server goes offline # In the event the server goes offline
if self.should_suspend(): if self.should_suspend():
# Set in service.py self.close_websocket()
if self.ws is not None:
self.ws.close()
self.ws = None
if self.wait_while_suspended(): if self.wait_while_suspended():
# Abort was requested while waiting. We should exit # Abort was requested while waiting. We should exit
return return
@ -132,8 +122,7 @@ class WebSocket(backgroundthread.KillableThread):
import traceback import traceback
LOG.error("%s: Traceback:\n%s", LOG.error("%s: Traceback:\n%s",
self.__class__.__name__, traceback.format_exc()) self.__class__.__name__, traceback.format_exc())
if self.ws is not None: self.close_websocket()
self.ws.close()
self.ws = None self.ws = None
@ -141,11 +130,15 @@ class PMS_Websocket(WebSocket):
""" """
Websocket connection with the PMS for Plex Companion Websocket connection with the PMS for Plex Companion
""" """
def should_suspend(self): def run(self):
""" LOG.info("----===## Starting Websocket ##===----")
Returns True if the thread is suspended app.APP.register_pms_websocket(self)
""" try:
return self._suspended or app.SYNC.background_sync_disabled self._run()
finally:
self.close_websocket()
app.APP.deregister_pms_websocket(self)
LOG.info("##===---- Websocket Stopped ----===##")
def getUri(self): def getUri(self):
if self.redirect_uri: if self.redirect_uri:
@ -206,13 +199,15 @@ class Alexa_Websocket(WebSocket):
""" """
Websocket connection to talk to Amazon Alexa. Websocket connection to talk to Amazon Alexa.
""" """
def should_suspend(self): def run(self):
""" LOG.info("----===## Starting Alexa Websocket ##===----")
Overwrite method since we need to check for plex token app.APP.register_alexa_websocket(self)
""" try:
return (self._suspended or self._run()
not app.ACCOUNT.plex_token or finally:
app.ACCOUNT.restricted_user) self.close_websocket()
app.APP.deregister_alexa_websocket(self)
LOG.info("##===---- Alexa Websocket Stopped ----===##")
def getUri(self): def getUri(self):
if self.redirect_uri: if self.redirect_uri: