Shut down sync more cleanly if interrupted

This commit is contained in:
tomkat83 2016-04-08 09:11:03 +02:00
parent 5289619792
commit 4a63e03615
5 changed files with 117 additions and 54 deletions

View file

@ -444,7 +444,7 @@
<string id="39407">Full library sync finished</string> <string id="39407">Full library sync finished</string>
<string id="39408">Sync had to skip some items because they could not be processed. Kodi may be instable now!! Please post your Kodi logs to the Plex forum.</string> <string id="39408">Sync had to skip some items because they could not be processed. Kodi may be instable now!! Please post your Kodi logs to the Plex forum.</string>
<string id="39409">The Plex Server did not like you asking for so much data at once and returned ERRORS. Try lowering the number of sync download threads in the settings. Skipped some items for now.</string> <string id="39409">The Plex Server did not like you asking for so much data at once and returned ERRORS. Try lowering the number of sync download threads in the settings. Skipped some items for now.</string>
<string id="39410">ERROR in library sync</string>
<!-- Plex videonodes.py --> <!-- Plex videonodes.py -->
<string id="39500">On Deck</string> <string id="39500">On Deck</string>

View file

@ -380,6 +380,7 @@
<string id="39407">Plex Bibliotheken aktualisiert</string> <string id="39407">Plex Bibliotheken aktualisiert</string>
<string id="39408">Einige Plex Einträge mussten übersprungen werden, da sie nicht verarbeitet werden konnten. Kodi ist nun möglicherweise instabil!! Bitte teilen Sie Ihr Kodi log im Plex Forum.</string> <string id="39408">Einige Plex Einträge mussten übersprungen werden, da sie nicht verarbeitet werden konnten. Kodi ist nun möglicherweise instabil!! Bitte teilen Sie Ihr Kodi log im Plex Forum.</string>
<string id="39409">Der Plex Server war überfordert und hat mit ERROR reagiert. Versuchen Sie, in den PKC Optionen die Download Sync Threads Anzahl zu reduzieren. Ein paar Plex Filme wurden nun übersprungen.</string> <string id="39409">Der Plex Server war überfordert und hat mit ERROR reagiert. Versuchen Sie, in den PKC Optionen die Download Sync Threads Anzahl zu reduzieren. Ein paar Plex Filme wurden nun übersprungen.</string>
<string id="39410">Synchronisierungs-ERROR</string>
<!-- Plex videonodes.py --> <!-- Plex videonodes.py -->
<string id="39500">Aktuell</string> <string id="39500">Aktuell</string>

View file

@ -237,6 +237,8 @@ def DownloadChunks(url, containerSize):
if containerSize is None: if containerSize is None:
# Get rid of '?' or '&' at the end of url # Get rid of '?' or '&' at the end of url
xml = downloadutils.DownloadUtils().downloadUrl(url[:-1]) xml = downloadutils.DownloadUtils().downloadUrl(url[:-1])
if xml == 401:
return 401
try: try:
xml.attrib xml.attrib
except AttributeError: except AttributeError:

View file

@ -47,9 +47,21 @@ class ThreadedGetMetadata(Thread):
self.lock = lock self.lock = lock
self.processlock = processlock self.processlock = processlock
# Just in case a time sync goes wrong # Just in case a time sync goes wrong
utils.window('kodiplextimeoffset', value='0') utils.window('kodiplextimeoffset',
utils.settings('kodiplextimeoffset'))
Thread.__init__(self) Thread.__init__(self)
def terminateNow(self):
while not self.queue.empty():
# Still try because remaining item might have been taken
try:
self.queue.get(block=False)
except Queue.Empty:
xbmc.sleep(50)
continue
else:
self.queue.task_done()
def run(self): def run(self):
# cache local variables because it's faster # cache local variables because it's faster
queue = self.queue queue = self.queue
@ -87,15 +99,7 @@ class ThreadedGetMetadata(Thread):
utils.window('plex_scancrashed', value='401') utils.window('plex_scancrashed', value='401')
# Kill remaining items in queue (for main thread to cont.) # Kill remaining items in queue (for main thread to cont.)
queue.task_done() queue.task_done()
while not queue.empty(): self.terminateNow()
# Still try because remaining item might have been taken
try:
queue.get(block=False)
except Queue.Empty:
xbmc.sleep(100)
continue
else:
queue.task_done()
break break
updateItem['XML'] = plexXML updateItem['XML'] = plexXML
@ -106,6 +110,8 @@ class ThreadedGetMetadata(Thread):
getMetadataCount += 1 getMetadataCount += 1
# signals to queue job is done # signals to queue job is done
queue.task_done() queue.task_done()
# Empty queue in case PKC was shut down (main thread hangs otherwise)
self.terminateNow()
@utils.ThreadMethodsAdditionalStop('emby_shouldStop') @utils.ThreadMethodsAdditionalStop('emby_shouldStop')
@ -128,6 +134,17 @@ class ThreadedProcessMetadata(Thread):
self.itemType = itemType self.itemType = itemType
Thread.__init__(self) Thread.__init__(self)
def terminateNow(self):
while not self.queue.empty():
# Still try because remaining item might have been taken
try:
self.queue.get(block=False)
except Queue.Empty:
xbmc.sleep(100)
continue
else:
self.queue.task_done()
def run(self): def run(self):
# Constructs the method name, e.g. itemtypes.Movies # Constructs the method name, e.g. itemtypes.Movies
itemFkt = getattr(itemtypes, self.itemType) itemFkt = getattr(itemtypes, self.itemType)
@ -163,6 +180,10 @@ class ThreadedProcessMetadata(Thread):
processingViewName = title processingViewName = title
# signals to queue job is done # signals to queue job is done
queue.task_done() queue.task_done()
# Empty queue in case PKC was shut down (main thread hangs otherwise)
# Sleep, just in case the other threads throw another xml
xbmc.sleep(1000)
self.terminateNow()
@utils.ThreadMethodsAdditionalStop('emby_shouldStop') @utils.ThreadMethodsAdditionalStop('emby_shouldStop')
@ -273,6 +294,7 @@ class LibrarySync(Thread):
# Time offset between Kodi and PMS in seconds (=Koditime - PMStime) # Time offset between Kodi and PMS in seconds (=Koditime - PMStime)
self.timeoffset = 0 self.timeoffset = 0
self.lastSync = 0 self.lastSync = 0
self.lastTimeSync = 0
Thread.__init__(self) Thread.__init__(self)
@ -324,6 +346,8 @@ class LibrarySync(Thread):
plexId = False plexId = False
for unplayedId in unplayedIds: for unplayedId in unplayedIds:
if unplayedId not in resumeIds: if unplayedId not in resumeIds:
if self.threadStopped():
return
# Found an item we can work with! # Found an item we can work with!
kodiId = unplayedId kodiId = unplayedId
self.logMsg('Found kodiId: %s' % kodiId, 1) self.logMsg('Found kodiId: %s' % kodiId, 1)
@ -346,6 +370,8 @@ class LibrarySync(Thread):
unplayedIds = kodi_db.getUnplayedMusicItems() unplayedIds = kodi_db.getUnplayedMusicItems()
# We don't care about resuming songs in the middle # We don't care about resuming songs in the middle
for unplayedId in unplayedIds: for unplayedId in unplayedIds:
if self.threadStopped():
return
# Found an item we can work with! # Found an item we can work with!
kodiId = unplayedId kodiId = unplayedId
self.logMsg('Found kodiId: %s' % kodiId, 1) self.logMsg('Found kodiId: %s' % kodiId, 1)
@ -386,19 +412,25 @@ class LibrarySync(Thread):
containerSize=self.limitindex) containerSize=self.limitindex)
# Toggle watched state back # Toggle watched state back
PF.scrobble(plexId, 'unwatched') PF.scrobble(plexId, 'unwatched')
if items in (None, 401):
self.logMsg('Could not download all Plex leaves for library %s '
'with lastViewedAt=%s and containerSize=%s'
% (libraryId, timestamp, self.limitindex), -1)
return
# Get server timestamp for this change # Get server timestamp for this change
plextime = None plextime = None
for item in items: for item in items:
if item.attrib['ratingKey'] == plexId: if item.attrib['ratingKey'] == plexId:
plextime = item.attrib.get('lastViewedAt') plextime = item.attrib.get('lastViewedAt')
break break
if not plextime: if plextime is None:
self.logMsg("Could not set the items watched state, abort", -1) self.logMsg("Could not set the items watched state, abort", -1)
return return
# Calculate time offset Kodi-PMS # Calculate time offset Kodi-PMS
timeoffset = int(koditime) - int(plextime) timeoffset = int(koditime) - int(plextime)
utils.window('kodiplextimeoffset', value=str(timeoffset)) utils.window('kodiplextimeoffset', value=str(timeoffset))
utils.settings('kodiplextimeoffset', value=str(timeoffset))
self.logMsg("Time offset Koditime - Plextime in seconds: %s" self.logMsg("Time offset Koditime - Plextime in seconds: %s"
% str(self.timeoffset), 0) % str(self.timeoffset), 0)
@ -440,9 +472,6 @@ class LibrarySync(Thread):
# Add sources # Add sources
utils.sourcesXML() utils.sourcesXML()
# Set new timestamp NOW because sync might take a while
self.saveLastSync()
# Ensure that DBs exist if called for very first time # Ensure that DBs exist if called for very first time
self.initializeDBs() self.initializeDBs()
@ -458,9 +487,12 @@ class LibrarySync(Thread):
} }
if self.enableMusic: if self.enableMusic:
process['music'] = self.PlexMusic process['music'] = self.PlexMusic
# Do the processing
for itemtype in process: for itemtype in process:
completed = process[itemtype]() if self.threadStopped():
if not completed: return False
if not process[itemtype]():
xbmc.executebuiltin('InhibitIdleShutdown(false)') xbmc.executebuiltin('InhibitIdleShutdown(false)')
utils.setScreensaver(value=screensaver) utils.setScreensaver(value=screensaver)
return False return False
@ -848,17 +880,18 @@ class LibrarySync(Thread):
# Kill threads # Kill threads
self.logMsg("Waiting to kill threads", 1) self.logMsg("Waiting to kill threads", 1)
for thread in threads: for thread in threads:
# Threads might already have quit by themselves (e.g. Kodi exit)
try:
thread.stopThread() thread.stopThread()
except:
pass
self.logMsg("Stop sent to all threads", 1) self.logMsg("Stop sent to all threads", 1)
# Wait till threads are indeed dead # Wait till threads are indeed dead
for thread in threads: for thread in threads:
thread.join(5.0)
if thread.isAlive():
self.logMsg("Could not terminate thread", -1)
try: try:
del threads thread.join(1.0)
except: except:
self.logMsg("Could not delete threads", -1) pass
self.logMsg("Sync threads finished", 1) self.logMsg("Sync threads finished", 1)
self.updatelist = [] self.updatelist = []
@ -895,6 +928,8 @@ class LibrarySync(Thread):
if all_plexmovies is None: if all_plexmovies is None:
self.logMsg("Couldnt get section items, aborting for view.", 1) self.logMsg("Couldnt get section items, aborting for view.", 1)
continue continue
elif all_plexmovies == 401:
return False
# Populate self.updatelist and self.allPlexElementsId # Populate self.updatelist and self.allPlexElementsId
self.GetUpdatelist(all_plexmovies, self.GetUpdatelist(all_plexmovies,
itemType, itemType,
@ -903,7 +938,7 @@ class LibrarySync(Thread):
viewId) viewId)
self.GetAndProcessXMLs(itemType) self.GetAndProcessXMLs(itemType)
self.logMsg("Processed view", 1) self.logMsg("Processed view", 1)
# Update viewstate # Update viewstate for EVERY item
for view in views: for view in views:
if self.threadStopped(): if self.threadStopped():
return False return False
@ -1030,6 +1065,8 @@ class LibrarySync(Thread):
self.logMsg( self.logMsg(
"Error downloading show view xml for view %s" % viewId, -1) "Error downloading show view xml for view %s" % viewId, -1)
continue continue
elif allPlexTvShows == 401:
return False
# Populate self.updatelist and self.allPlexElementsId # Populate self.updatelist and self.allPlexElementsId
self.GetUpdatelist(allPlexTvShows, self.GetUpdatelist(allPlexTvShows,
itemType, itemType,
@ -1057,6 +1094,8 @@ class LibrarySync(Thread):
self.logMsg( self.logMsg(
"Error downloading season xml for show %s" % tvShowId, -1) "Error downloading season xml for show %s" % tvShowId, -1)
continue continue
elif seasons == 401:
return False
# Populate self.updatelist and self.allPlexElementsId # Populate self.updatelist and self.allPlexElementsId
self.GetUpdatelist(seasons, self.GetUpdatelist(seasons,
itemType, itemType,
@ -1083,6 +1122,8 @@ class LibrarySync(Thread):
"Error downloading episod xml for view %s" "Error downloading episod xml for view %s"
% view.get('name'), -1) % view.get('name'), -1)
continue continue
elif episodes == 401:
return False
# Populate self.updatelist and self.allPlexElementsId # Populate self.updatelist and self.allPlexElementsId
self.GetUpdatelist(episodes, self.GetUpdatelist(episodes,
itemType, itemType,
@ -1108,6 +1149,8 @@ class LibrarySync(Thread):
# Update viewstate: # Update viewstate:
for view in views: for view in views:
if self.threadStopped():
return False
self.PlexUpdateWatched(view['id'], itemType) self.PlexUpdateWatched(view['id'], itemType)
if self.compare: if self.compare:
@ -1140,10 +1183,13 @@ class LibrarySync(Thread):
# Process artist, then album and tracks last to minimize overhead # Process artist, then album and tracks last to minimize overhead
for kind in ('MusicArtist', 'MusicAlbum', 'Audio'): for kind in ('MusicArtist', 'MusicAlbum', 'Audio'):
if self.threadStopped(): if self.threadStopped():
return True return False
self.logMsg("Start processing music %s" % kind, 1) self.logMsg("Start processing music %s" % kind, 1)
self.ProcessMusic( if self.ProcessMusic(views,
views, kind, urlArgs[kind], methods[kind]) kind,
urlArgs[kind],
methods[kind]) is False:
return False
self.logMsg("Processing of music %s done" % kind, 1) self.logMsg("Processing of music %s done" % kind, 1)
self.GetAndProcessXMLs(itemType) self.GetAndProcessXMLs(itemType)
self.logMsg("GetAndProcessXMLs for music %s completed" % kind, 1) self.logMsg("GetAndProcessXMLs for music %s completed" % kind, 1)
@ -1173,7 +1219,7 @@ class LibrarySync(Thread):
for view in views: for view in views:
if self.threadStopped(): if self.threadStopped():
return True return False
# Get items per view # Get items per view
viewId = view['id'] viewId = view['id']
viewName = view['name'] viewName = view['name']
@ -1183,6 +1229,8 @@ class LibrarySync(Thread):
self.logMsg("Error downloading xml for view %s" self.logMsg("Error downloading xml for view %s"
% viewId, -1) % viewId, -1)
continue continue
elif itemsXML == 401:
return False
# Populate self.updatelist and self.allPlexElementsId # Populate self.updatelist and self.allPlexElementsId
self.GetUpdatelist(itemsXML, self.GetUpdatelist(itemsXML,
'Music', 'Music',
@ -1235,7 +1283,7 @@ class LibrarySync(Thread):
def multi_delete(self, liste, deleteListe): def multi_delete(self, liste, deleteListe):
""" """
Deletes the list items of liste at the positions in deleteListe Deletes the list items of liste at the positions in deleteListe
(arbitrary order) (which can be in any arbitrary order)
""" """
indexes = sorted(deleteListe, reverse=True) indexes = sorted(deleteListe, reverse=True)
for index in indexes: for index in indexes:
@ -1299,7 +1347,7 @@ class LibrarySync(Thread):
def process_newitems(self, item): def process_newitems(self, item):
ratingKey = item['ratingKey'] ratingKey = item['ratingKey']
xml = PF.GetPlexMetadata(ratingKey) xml = PF.GetPlexMetadata(ratingKey)
if xml is None or xml == 401: if xml in (None, 401):
self.logMsg('Could not download metadata for %s, skipping' self.logMsg('Could not download metadata for %s, skipping'
% ratingKey, -1) % ratingKey, -1)
return False return False
@ -1329,7 +1377,6 @@ class LibrarySync(Thread):
def process_deleteditems(self, item): def process_deleteditems(self, item):
if item.get('type') == 1: if item.get('type') == 1:
# Movie
self.logMsg("Removing movie %s" % item.get('ratingKey'), 1) self.logMsg("Removing movie %s" % item.get('ratingKey'), 1)
self.videoLibUpdate = True self.videoLibUpdate = True
with itemtypes.Movies() as movie: with itemtypes.Movies() as movie:
@ -1351,7 +1398,7 @@ class LibrarySync(Thread):
def process_timeline(self, data): def process_timeline(self, data):
""" """
PMS is messing with the library items, e.g. new or changed. Put in our PMS is messing with the library items, e.g. new or changed. Put in our
"processing queue" "processing queue" for later
""" """
for item in data: for item in data:
state = item.get('state') state = item.get('state')
@ -1423,7 +1470,7 @@ class LibrarySync(Thread):
# viewCount # viewCount
if currSess.get('duration') is None: if currSess.get('duration') is None:
xml = PF.GetPlexMetadata(ratingKey) xml = PF.GetPlexMetadata(ratingKey)
if xml is None or xml == 401: if xml in (None, 401):
self.logMsg('Could not get up-to-date xml for item %s' self.logMsg('Could not get up-to-date xml for item %s'
% ratingKey, -1) % ratingKey, -1)
continue continue
@ -1448,18 +1495,13 @@ class LibrarySync(Thread):
% (utils.window('plex_username'), % (utils.window('plex_username'),
utils.window('currUserId'), utils.window('currUserId'),
items[-1]), 2) items[-1]), 2)
# Now tell Kodi where we are
for item in items: for item in items:
itemFkt = getattr(itemtypes, itemFkt = getattr(itemtypes,
PF.GetItemClassFromType(item['kodi_type'])) PF.GetItemClassFromType(item['kodi_type']))
with itemFkt() as Fkt: with itemFkt() as Fkt:
Fkt.updatePlaystate(item) Fkt.updatePlaystate(item)
def saveLastSync(self):
"""
Save last full sync time
"""
self.lastSync = utils.getUnixTimestamp()
def run(self): def run(self):
try: try:
self.run_internal() self.run_internal()
@ -1467,10 +1509,11 @@ class LibrarySync(Thread):
utils.window('emby_dbScan', clear=True) utils.window('emby_dbScan', clear=True)
self.logMsg('LibrarySync thread crashed', -1) self.logMsg('LibrarySync thread crashed', -1)
self.logMsg('Error message: %s' % e, -1) self.logMsg('Error message: %s' % e, -1)
import traceback
self.logMsg("Traceback:\n%s" % traceback.format_exc(), -1)
# Library sync thread has crashed # Library sync thread has crashed
self.dialog.ok( self.dialog.ok(self.addonName,
heading=self.addonName, self.__language__(39400))
line1=self.__language__(39400))
raise raise
def run_internal(self): def run_internal(self):
@ -1484,9 +1527,13 @@ class LibrarySync(Thread):
enableBackgroundSync = self.enableBackgroundSync enableBackgroundSync = self.enableBackgroundSync
fullSync = self.fullSync fullSync = self.fullSync
processMessage = self.processMessage processMessage = self.processMessage
processItems = self.processItems
string = self.__language__ string = self.__language__
fullSyncInterval = self.fullSyncInterval fullSyncInterval = self.fullSyncInterval
lastSync = self.lastSync
lastTimeSync = self.lastTimeSync
lastProcessing = 0 lastProcessing = 0
oneDay = 60*60*24
xbmcplayer = xbmc.Player() xbmcplayer = xbmc.Player()
@ -1499,7 +1546,7 @@ class LibrarySync(Thread):
log("---===### Starting LibrarySync ###===---", 0) log("---===### Starting LibrarySync ###===---", 0)
while not threadStopped(): while not threadStopped():
# In the event the server goes offline, or an item is playing # In the event the server goes offline
while threadSuspended(): while threadSuspended():
# Set in service.py # Set in service.py
if threadStopped(): if threadStopped():
@ -1596,7 +1643,7 @@ class LibrarySync(Thread):
# Remove video nodes # Remove video nodes
utils.deleteNodes() utils.deleteNodes()
# Kick off refresh # Kick off refresh
if self.maintainViews(): if self.maintainViews() is True:
# Ran successfully # Ran successfully
log("Refresh playlists/nodes completed", 0) log("Refresh playlists/nodes completed", 0)
# "Plex playlists/nodes refreshed" # "Plex playlists/nodes refreshed"
@ -1611,24 +1658,33 @@ class LibrarySync(Thread):
window('emby_dbScan', clear=True) window('emby_dbScan', clear=True)
else: else:
now = utils.getUnixTimestamp() now = utils.getUnixTimestamp()
if (now - self.lastSync > fullSyncInterval and if (now - lastSync > fullSyncInterval and
not xbmcplayer.isPlaying()): not xbmcplayer.isPlaying()):
lastSync = now
log('Doing scheduled full library scan', 1) log('Doing scheduled full library scan', 1)
# Recalculate time offset Kodi - PMS
self.syncPMStime()
window('emby_dbScan', value="true") window('emby_dbScan', value="true")
log('Running background full lib scan', 0) if fullSync() is False and not threadStopped():
fullSync() log('Could not finish scheduled full sync', -1)
self.showKodiNote(string(39410),
forced=True,
icon='error')
window('emby_dbScan', clear=True) window('emby_dbScan', clear=True)
# Full library sync finished # Full library sync finished
self.showKodiNote(string(39407), forced=False) self.showKodiNote(string(39407), forced=False)
elif now - lastTimeSync > oneDay:
lastTimeSync = now
log('Starting daily time sync', 0)
window('emby_dbScan', value="true")
self.syncPMStime()
window('emby_dbScan', clear=True)
elif enableBackgroundSync: elif enableBackgroundSync:
# Check back whether we should process something # Check back whether we should process something
# Only do this once every 10 seconds
if now - lastProcessing > 10: if now - lastProcessing > 10:
window('emby_dbScan', value="true")
self.processItems()
window('emby_dbScan', clear=True)
lastProcessing = now lastProcessing = now
window('emby_dbScan', value="true")
processItems()
window('emby_dbScan', clear=True)
# See if there is a PMS message we need to handle # See if there is a PMS message we need to handle
try: try:
message = queue.get(block=False) message = queue.get(block=False)
@ -1643,6 +1699,9 @@ class LibrarySync(Thread):
window('emby_dbScan', clear=True) window('emby_dbScan', clear=True)
# NO sleep! # NO sleep!
continue continue
else:
# Still sleep if backgroundsync disabled
xbmc.sleep(100)
xbmc.sleep(100) xbmc.sleep(100)

View file

@ -76,6 +76,7 @@
<setting type="lsep" label="30523" visible="false"/> <!-- Music metadata options --> <setting type="lsep" label="30523" visible="false"/> <!-- Music metadata options -->
<setting id="enableImportSongRating" type="bool" label="30524" default="true" visible="false"/> <setting id="enableImportSongRating" type="bool" label="30524" default="true" visible="false"/>
<setting id="enableExportSongRating" type="bool" label="30525" default="false" visible="false" /> <setting id="enableExportSongRating" type="bool" label="30525" default="false" visible="false" />
<setting id="kodiplextimeoffset" type="number" label="Time difference in seconds (Koditime - Plextime)" default="0" visible="false" option="int" />
<setting id="enableUpdateSongRating" type="bool" label="30526" default="false" visible="false" /> <setting id="enableUpdateSongRating" type="bool" label="30526" default="false" visible="false" />
<setting id="emby_pathverified" type="bool" default="false" visible="false" /> <!-- If 'false': one single warning message pops up if PKC cannot verify direct paths --> <setting id="emby_pathverified" type="bool" default="false" visible="false" /> <!-- If 'false': one single warning message pops up if PKC cannot verify direct paths -->
</category> </category>