diff --git a/resources/language/resource.language.en_gb/strings.po b/resources/language/resource.language.en_gb/strings.po index 799bb7f5..842f3089 100644 --- a/resources/language/resource.language.en_gb/strings.po +++ b/resources/language/resource.language.en_gb/strings.po @@ -513,6 +513,11 @@ msgctxt "#30522" msgid "Force transcode h265/HEVC" msgstr "" +# PKC Settings - Sync Options +msgctxt "#30523" +msgid "Also show sync progress for playstate and user data" +msgstr "" + # PKC Settings - Playback msgctxt "#30527" msgid "Ignore specials in next episodes" diff --git a/resources/lib/kodi_db/music.py b/resources/lib/kodi_db/music.py index c42e03c4..7dbedf2c 100644 --- a/resources/lib/kodi_db/music.py +++ b/resources/lib/kodi_db/music.py @@ -476,6 +476,8 @@ class KodiMusicDB(common.KodiDBBase): elif kodi_type == v.KODI_TYPE_ALBUM: column = 'iUserrating' identifier = 'idAlbum' + else: + return self.cursor.execute('''UPDATE %s SET %s = ? WHERE ? = ?''' % (kodi_type, column), (userrating, identifier, kodi_id)) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index e11069f5..2f78677d 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -5,13 +5,13 @@ from logging import getLogger import Queue import copy +import xbmcgui + from cProfile import Profile from pstats import Stats from StringIO import StringIO from .get_metadata import GetMetadataTask, reset_collections -from .process_metadata import InitNewSection, UpdateLastSyncAndPlaystate, \ - ProcessMetadata, DeleteItem from . import common, sections from .. import utils, timing, backgroundthread, variables as v, app from .. import plex_functions as PF, itemtypes @@ -26,6 +26,45 @@ else: PLAYLIST_SYNC_ENABLED = False LOG = getLogger('PLEX.sync.full_sync') +# How many items will be put through the processing chain at once? +BATCH_SIZE = 2000 +# Safety margin to filter PMS items - how many seconds to look into the past? +UPDATED_AT_SAFETY = 60 * 5 +LAST_VIEWED_AT_SAFETY = 60 * 5 + + +def tag_last(iterable): + """ + Given some iterable, returns (last, item), where last is only True if you + are on the final iteration. + """ + iterator = iter(iterable) + gotone = False + try: + lookback = next(iterator) + gotone = True + while True: + cur = next(iterator) + yield False, lookback + lookback = cur + except StopIteration: + if gotone: + yield True, lookback + raise StopIteration() + + +class InitNewSection(object): + """ + Throw this into the queue used for ProcessMetadata to tell it which + Plex library section we're looking at + """ + def __init__(self, context, total_number_of_items, section_name, + section_id, plex_type): + self.context = context + self.total = total_number_of_items + self.name = section_name + self.id = section_id + self.plex_type = plex_type class FullSync(common.libsync_mixin): @@ -36,19 +75,41 @@ class FullSync(common.libsync_mixin): self._canceled = False self.repair = repair self.callback = callback - self.show_dialog = show_dialog self.queue = None self.process_thread = None self.current_sync = None self.plexdb = None self.plex_type = None self.section_type = None - self.processing_thread = None + self.worker_count = int(utils.settings('syncThreadNumber')) + self.item_count = 0 + # For progress dialog + self.show_dialog = show_dialog + self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true' + self.dialog = None + self.total = 0 + self.current = 1 + self.processed = 0 + self.title = '' + self.section = None + self.section_name = None self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true' self.threader = backgroundthread.ThreaderManager( - worker=backgroundthread.NonstoppingBackgroundWorker) + worker=backgroundthread.NonstoppingBackgroundWorker, + worker_count=self.worker_count) super(FullSync, self).__init__() + def update_progressbar(self): + if self.show_dialog: + try: + progress = int(float(self.current) / float(self.total) * 100.0) + except ZeroDivisionError: + progress = 0 + self.dialog.update(progress, + '%s (%s)' % (self.section_name, self.section_type_text), + '%s/%s %s' + % (self.current, self.total, self.title)) + def process_item(self, xml_item): """ Processes a single library item @@ -58,30 +119,87 @@ class FullSync(common.libsync_mixin): int('%s%s' % (plex_id, xml_item.get('updatedAt', xml_item.get('addedAt', 1541572987)))): - # Already got EXACTLY this item in our DB. BUT need to collect all - # DB updates within the same thread - self.queue.put(UpdateLastSyncAndPlaystate(plex_id, xml_item)) return - task = GetMetadataTask() - task.setup(self.queue, plex_id, self.plex_type, self.get_children) - self.threader.addTask(task) + self.threader.addTask(GetMetadataTask(self.queue, + plex_id, + self.plex_type, + self.get_children)) + self.item_count += 1 - def process_delete(self): + def process_playstate(self, xml_item): """ - Removes all the items that have NOT been updated (last_sync timestamp - is different) + Processes the playstate of a single library item """ - for plex_id in self.plexdb.plex_id_by_last_sync(self.plex_type, - self.current_sync): - if self.isCanceled(): - return - self.queue.put(DeleteItem(plex_id)) + plex_id = int(xml_item.get('ratingKey')) + if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \ + int('%s%s' % (plex_id, + xml_item.get('updatedAt', + xml_item.get('addedAt', 1541572987)))): + return + self.threader.addTask(GetMetadataTask(self.queue, + plex_id, + self.plex_type, + self.get_children)) + self.item_count += 1 + + def update_library(self): + LOG.debug('Writing changes to Kodi library now') + i = 0 + if not self.section: + self.section = self.queue.get() + self.queue.task_done() + while not self.isCanceled() and self.item_count > 0: + section = self.section + if not section: + break + LOG.debug('Start or continue processing section %s (%ss)', + section.name, section.plex_type) + self.current = 1 + self.processed = 0 + self.total = section.total + self.section_name = section.name + self.section_type_text = utils.lang( + v.TRANSLATION_FROM_PLEXTYPE[section.plex_type]) + with section.context(self.current_sync) as context: + while not self.isCanceled() and self.item_count > 0: + try: + item = self.queue.get(block=False) + except backgroundthread.Queue.Empty: + if self.threader.threader.working(): + app.APP.monitor.waitForAbort(0.02) + continue + else: + # Try again, in case a thread just finished + i += 1 + if i == 3: + break + continue + i = 0 + self.queue.task_done() + if isinstance(item, dict): + context.add_update(item['xml'][0], + section_name=section.name, + section_id=section.id, + children=item['children']) + self.title = item['xml'][0].get('title') + self.processed += 1 + elif isinstance(item, InitNewSection) or item is None: + self.section = item + break + else: + raise ValueError('Unknown type %s' % type(item)) + self.item_count -= 1 + self.update_progressbar() + self.current += 1 + if self.processed == 500: + self.processed = 0 + context.commit() + LOG.debug('Done writing changes to Kodi library') @utils.log_time - def process_section(self, section): - LOG.debug('Processing library section %s', section) - if self.isCanceled(): - return False + def addupdate_section(self, section): + LOG.debug('Processing library section for new or changed items %s', + section) if not self.install_sync_done: app.SYNC.path_verified = False try: @@ -94,45 +212,75 @@ class FullSync(common.libsync_mixin): section['section_id'], section['plex_type']) self.queue.put(queue_info) - with PlexDB() as self.plexdb: - for xml_item in iterator: - if self.isCanceled(): - return False - self.process_item(xml_item) + last = True + # To keep track of the item-number in order to kill while loops + self.item_count = 0 + while True: + # Check Plex DB to see what we need to add/update + with PlexDB() as self.plexdb: + for i, (last, xml_item) in enumerate(tag_last(iterator)): + if self.isCanceled(): + return False + self.process_item(xml_item) + if self.item_count == BATCH_SIZE: + break + # Make sure Plex DB above is closed before adding/updating + if self.item_count == BATCH_SIZE: + self.update_library() + if last: + break + self.update_library() + reset_collections() + return True except RuntimeError: LOG.error('Could not entirely process section %s', section) return False - LOG.debug('Waiting for download threads to finish') - while self.threader.threader.working(): - app.APP.monitor.waitForAbort(0.1) - reset_collections() + + @utils.log_time + def playstate_per_section(self, section): + LOG.debug('Processing playstate for library section %s', section) try: - # Tell the processing thread that we're syncing playstate + # Sync new, updated and deleted items + iterator = section['iterator'] + # Tell the processing thread about this new section queue_info = InitNewSection(section['context'], iterator.total, - iterator.get('librarySectionTitle'), + section['section_name'], section['section_id'], section['plex_type']) self.queue.put(queue_info) - LOG.debug('Waiting for processing thread to finish section') - # Make sure that the processing thread commits all changes - self.queue.join() - with PlexDB() as self.plexdb: - # Delete movies that are not on Plex anymore - LOG.debug('Look for items to delete') - self.process_delete() - # Wait again till the processing thread is done - self.queue.join() + self.total = iterator.total + self.section_name = section['section_name'] + self.section_type_text = utils.lang( + v.TRANSLATION_FROM_PLEXTYPE[section['plex_type']]) + self.current = 0 + with section['context'](self.current_sync) as itemtype: + for xml_item in iterator: + if self.isCanceled(): + return False + itemtype.update_userdata(xml_item, section['plex_type']) + itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']), + section['plex_type'], + self.current_sync) + self.current += 1 + self.update_progressbar() + return True except RuntimeError: - LOG.error('Could not process playstate for section %s', section) + LOG.error('Could not entirely process section %s', section) return False - LOG.debug('Done processing playstate for section') - return True - def threaded_get_iterators(self, kinds, queue): + def threaded_get_iterators(self, kinds, queue, updated_at=None, + last_viewed_at=None): """ PF.SectionItems is costly, so let's do it asynchronous """ + if self.repair: + updated_at = None + last_viewed_at = None + else: + updated_at = updated_at - UPDATED_AT_SAFETY if updated_at else None + last_viewed_at = last_viewed_at - LAST_VIEWED_AT_SAFETY \ + if last_viewed_at else None try: for kind in kinds: for section in (x for x in sections.SECTIONS @@ -146,7 +294,9 @@ class FullSync(common.libsync_mixin): element['context'] = kind[2] element['get_children'] = kind[3] element['iterator'] = PF.SectionItems(section['section_id'], - plex_type=kind[0]) + plex_type=kind[0], + updated_at=updated_at, + last_viewed_at=last_viewed_at) queue.put(element) finally: queue.put(None) @@ -165,16 +315,24 @@ class FullSync(common.libsync_mixin): (v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False), (v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True), ]) + # ADD NEW ITEMS # Already start setting up the iterators. We need to enforce # syncing e.g. show before season before episode + if not self.show_dialog_userdata and self.dialog: + # Close the progress indicator dialog + self.dialog.close() + self.dialog = None iterator_queue = Queue.Queue() + updated_at = int(utils.settings('lastfullsync')) or None task = backgroundthread.FunctionAsTask(self.threaded_get_iterators, None, kinds, - iterator_queue) + iterator_queue, + updated_at=updated_at) backgroundthread.BGThreader.addTask(task) while True: section = iterator_queue.get() + iterator_queue.task_done() if section is None: break # Setup our variables @@ -183,53 +341,76 @@ class FullSync(common.libsync_mixin): self.context = section['context'] self.get_children = section['get_children'] # Now do the heavy lifting - if self.isCanceled() or not self.process_section(section): + if self.isCanceled() or not self.addupdate_section(section): return False + # SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that + # were set to unwatched). Also mark all items on the PMS to be able + # to delete the ones still in Kodi + LOG.info('Start synching playstate and userdata for every item') + task = backgroundthread.FunctionAsTask(self.threaded_get_iterators, + None, + kinds, + iterator_queue) + backgroundthread.BGThreader.addTask(task) + while True: + section = iterator_queue.get() iterator_queue.task_done() + if section is None: + break + # Setup our variables + self.plex_type = section['plex_type'] + self.section_type = section['section_type'] + self.context = section['context'] + self.get_children = section['get_children'] + # Now do the heavy lifting + if self.isCanceled() or not self.playstate_per_section(section): + return False + # Delete movies that are not on Plex anymore + LOG.info('Looking for items to delete') + with section['context'](self.current_sync) as context: + for plex_id in context.plexdb.plex_id_by_last_sync(self.plex_type, + self.current_sync): + if self.isCanceled(): + return False + context.remove(plex_id, self.plex_type) + LOG.debug('Done deleting') return True @utils.log_time def run(self): profile = Profile() profile.enable() - if self.isCanceled(): - return - successful = False - self.current_sync = timing.unix_timestamp() + self.current_sync = timing.plex_now() # Delete playlist and video node files from Kodi utils.delete_playlists() utils.delete_nodes() # Get latest Plex libraries and build playlist and video node files if not sections.sync_from_pms(): return + successful = False try: - # Fire up our single processing thread - self.queue = backgroundthread.Queue.Queue(maxsize=1000) - self.processing_thread = ProcessMetadata(self.queue, - self.current_sync, - self.show_dialog) - self.processing_thread.start() + self.queue = backgroundthread.Queue.Queue() + if self.show_dialog: + self.dialog = xbmcgui.DialogProgressBG() + self.dialog.create(utils.lang(39714)) # Actual syncing - do only new items first LOG.info('Running full_library_sync with repair=%s', self.repair) if not self.full_library_sync(): return - # Tell the processing thread to exit with one last element None - self.queue.put(None) if self.isCanceled(): return if PLAYLIST_SYNC_ENABLED and not playlists.full_sync(): return successful = True - except: - utils.ERROR(txt='full_sync.py crashed', notify=True) finally: - # This will block until the processing thread really exits - LOG.debug('Waiting for processing thread to exit') - self.processing_thread.join() common.update_kodi_library(video=True, music=True) + if self.dialog: + self.dialog.close() self.threader.shutdown() + if successful: + utils.settings('lastfullsync', value=str(int(self.current_sync))) if self.callback: self.callback(successful) LOG.info('Done full_sync') diff --git a/resources/lib/library_sync/get_metadata.py b/resources/lib/library_sync/get_metadata.py index 5afe88a2..8f4fe780 100644 --- a/resources/lib/library_sync/get_metadata.py +++ b/resources/lib/library_sync/get_metadata.py @@ -36,11 +36,12 @@ class GetMetadataTask(common.libsync_mixin, backgroundthread.Task): queue Queue.Queue() object where this thread will store the downloaded metadata XMLs as etree objects """ - def setup(self, queue, plex_id, plex_type, get_children=False): + def __init__(self, queue, plex_id, plex_type, get_children=False): self.queue = queue self.plex_id = plex_id self.plex_type = plex_type self.get_children = get_children + super(GetMetadataTask, self).__init__() def _collections(self, item): global COLLECTION_MATCH, COLLECTION_XMLS diff --git a/resources/lib/library_sync/process_metadata.py b/resources/lib/library_sync/process_metadata.py deleted file mode 100644 index abdd8be2..00000000 --- a/resources/lib/library_sync/process_metadata.py +++ /dev/null @@ -1,146 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import absolute_import, division, unicode_literals -from logging import getLogger -import xbmcgui - -from cProfile import Profile -from pstats import Stats -from StringIO import StringIO - -from . import common -from .. import backgroundthread, utils, variables as v - -LOG = getLogger('PLEX.sync.process_metadata') - - -class InitNewSection(object): - """ - Throw this into the queue used for ProcessMetadata to tell it which - Plex library section we're looking at - """ - def __init__(self, context, total_number_of_items, section_name, - section_id, plex_type): - self.context = context - self.total = total_number_of_items - self.name = section_name - self.id = section_id - self.plex_type = plex_type - - -class UpdateLastSyncAndPlaystate(object): - def __init__(self, plex_id, xml_item): - self.plex_id = plex_id - self.xml_item = xml_item - - -class DeleteItem(object): - def __init__(self, plex_id): - self.plex_id = plex_id - - -class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread): - """ - Not yet implemented for more than 1 thread - if ever. Only to be called by - ONE thread! - Processes the XML metadata in the queue - """ - def __init__(self, queue, last_sync, show_dialog): - self._canceled = False - self.queue = queue - self.last_sync = last_sync - self.show_dialog = show_dialog - self.total = 0 - self.current = 1 - self.processed = 0 - self.title = '' - self.section_name = None - self.dialog = None - super(ProcessMetadata, self).__init__() - - def update_progressbar(self): - if self.show_dialog: - try: - progress = int(float(self.current) / float(self.total) * 100.0) - except ZeroDivisionError: - progress = 0 - self.dialog.update(progress, - '%s (%s)' % (self.section_name, self.section_type_text), - '%s/%s %s' - % (self.current, self.total, self.title)) - - def run(self): - LOG.debug('Processing thread started') - if self.show_dialog: - self.dialog = xbmcgui.DialogProgressBG() - self.dialog.create(utils.lang(39714)) - try: - self._run() - except: - utils.ERROR(notify=True, cancel_sync=True) - finally: - if self.dialog: - self.dialog.close() - while not self.queue.empty(): - # We need to empty the queue to let full_sync finish join() - self.queue.get() - self.queue.task_done() - LOG.debug('Processing thread terminated') - - def _run(self): - """ - Do the work - """ - # Init with the very first library section. This will block! - section = self.queue.get() - self.queue.task_done() - if section is None: - return - while not self.isCanceled(): - if section is None: - break - LOG.debug('Start processing section %s (%ss)', - section.name, section.plex_type) - self.current = 1 - self.processed = 0 - self.total = section.total - self.section_name = section.name - self.section_type_text = utils.lang( - v.TRANSLATION_FROM_PLEXTYPE[section.plex_type]) - profile = Profile() - profile.enable() - with section.context(self.last_sync) as context: - while not self.isCanceled(): - # grabs item from queue. This will block! - item = self.queue.get() - if isinstance(item, dict): - context.add_update(item['xml'][0], - section_name=section.name, - section_id=section.id, - children=item['children']) - self.title = item['xml'][0].get('title') - self.processed += 1 - elif isinstance(item, UpdateLastSyncAndPlaystate): - context.plexdb.update_last_sync(item.plex_id, - section.plex_type, - self.last_sync) - if section.plex_type != v.PLEX_TYPE_ARTIST: - context.update_userdata(item.xml_item, - section.plex_type) - elif isinstance(item, InitNewSection) or item is None: - section = item - break - else: - context.remove(item.plex_id, plex_type=section.plex_type) - self.update_progressbar() - self.current += 1 - if self.processed == 500: - self.processed = 0 - context.commit() - self.queue.task_done() - self.queue.task_done() - profile.disable() - string_io = StringIO() - stats = Stats(profile, stream=string_io).sort_stats('cumulative') - stats.print_stats() - LOG.info('cProfile result: ') - LOG.info(string_io.getvalue()) diff --git a/resources/lib/timing.py b/resources/lib/timing.py index 48c8e946..079b6807 100644 --- a/resources/lib/timing.py +++ b/resources/lib/timing.py @@ -46,6 +46,14 @@ def plex_date_to_kodi(plex_timestamp): localtime(float(plex_timestamp) + KODI_PLEX_TIME_OFFSET)) +def kodi_date_to_plex(kodi_timestamp): + return float(kodi_timestamp) - KODI_PLEX_TIME_OFFSET + + +def plex_now(): + return kodi_date_to_plex(unix_timestamp()) + + def kodi_timestamp(plex_timestamp): return unix_date_to_kodi(plex_timestamp) diff --git a/resources/lib/utils.py b/resources/lib/utils.py index 53026ff2..f9a18e2c 100644 --- a/resources/lib/utils.py +++ b/resources/lib/utils.py @@ -477,6 +477,7 @@ def wipe_database(): kodi_db.reset_cached_images() # reset the install run flag settings('SyncInstallRunDone', value="false") + settings('lastfullsync', value="0") LOG.info('Wiping done') diff --git a/resources/settings.xml b/resources/settings.xml index 57fc4cce..812885dd 100644 --- a/resources/settings.xml +++ b/resources/settings.xml @@ -57,6 +57,7 @@ + @@ -82,6 +83,7 @@ +