diff --git a/resources/lib/itemtypes/common.py b/resources/lib/itemtypes/common.py index 2b80869c..f0dc3b4a 100644 --- a/resources/lib/itemtypes/common.py +++ b/resources/lib/itemtypes/common.py @@ -44,7 +44,8 @@ class ItemBase(object): Input: kodiType: optional argument; e.g. 'video' or 'music' """ - def __init__(self, plex_db=None, kodi_db=None): + def __init__(self, last_sync, plex_db=None, kodi_db=None): + self.last_sync = last_sync self.artwork = artwork.Artwork() self.plexconn = None self.plexcursor = plex_db.plexcursor if plex_db else None diff --git a/resources/lib/itemtypes/tvshows.py b/resources/lib/itemtypes/tvshows.py index 9872e615..055cf88d 100644 --- a/resources/lib/itemtypes/tvshows.py +++ b/resources/lib/itemtypes/tvshows.py @@ -134,7 +134,8 @@ class Show(ItemBase, TvShowMixin): """ For Plex library-type TV shows """ - def add_update(self, xml, viewtag=None, viewid=None): + def add_update(self, xml, section_name=None, section_id=None, + children=None): """ Process a single show """ @@ -199,15 +200,6 @@ class Show(ItemBase, TvShowMixin): if update_item: LOG.info("UPDATE tvshow plex_id: %s - Title: %s", plex_id, api.title()) - # Add reference is idempotent; the call here updates also fileid - # and path_id when item is moved or renamed - self.plex_db.addReference(plex_id, - v.PLEX_TYPE_SHOW, - kodi_id, - v.KODI_TYPE_SHOW, - kodi_pathid=path_id, - checksum=api.checksum(), - view_id=viewid) # update new ratings Kodi 17 rating_id = self.kodi_db.get_ratingid(kodi_id, v.KODI_TYPE_SHOW) self.kodi_db.update_ratings(kodi_id, @@ -248,13 +240,7 @@ class Show(ItemBase, TvShowMixin): query = "INSERT INTO tvshowlinkpath(idShow, idPath) values (?, ?)" self.kodicursor.execute(query, (kodi_id, path_id)) # Create the reference in plex table - self.plex_db.addReference(plex_id, - v.PLEX_TYPE_SHOW, - kodi_id, - v.KODI_TYPE_SHOW, - kodi_pathid=path_id, - checksum=api.checksum(), - view_id=viewid) + rating_id = self.kodi_db.get_ratingid(kodi_id, v.KODI_TYPE_SHOW) self.kodi_db.add_ratings(rating_id, kodi_id, @@ -295,13 +281,22 @@ class Show(ItemBase, TvShowMixin): # Process studios self.kodi_db.modify_studios(kodi_id, v.KODI_TYPE_SHOW, studios) # Process tags: view, PMS collection tags - tags = [viewtag] + tags = [section_name] tags.extend([i for _, i in api.collection_list()]) self.kodi_db.modify_tags(kodi_id, v.KODI_TYPE_SHOW, tags) + self.plex_db.addReference(plex_id, + v.PLEX_TYPE_SHOW, + kodi_id, + v.KODI_TYPE_SHOW, + kodi_pathid=path_id, + checksum=api.checksum(), + view_id=section_id, + last_sync=self.last_sync) class Season(ItemBase, TvShowMixin): - def add_update(self, xml, viewtag=None, viewid=None): + def add_update(self, xml, section_name=None, section_id=None, + children=None): """ Process a single season of a certain tv show """ @@ -334,12 +329,14 @@ class Season(ItemBase, TvShowMixin): kodi_id, v.KODI_TYPE_SEASON, parent_id=show_id, - view_id=viewid, - checksum=api.checksum()) + view_id=section_id, + checksum=api.checksum(), + last_sync=self.last_sync) class Episode(ItemBase, TvShowMixin): - def add_update(self, xml, viewtag=None, viewid=None): + def add_update(self, xml, section_name=None, section_id=None, + children=None): """ Process single episode """ @@ -501,18 +498,6 @@ class Episode(ItemBase, TvShowMixin): airs_before_season, airs_before_episode, playurl, path_id, season_id, userdata['UserRating'])) - # Create or update the reference in plex table Add reference is - # idempotent; the call here updates also file_id and path_id when item - # is moved or renamed - self.plex_db.addReference(plex_id, - v.PLEX_TYPE_EPISODE, - kodi_id, - v.KODI_TYPE_EPISODE, - kodi_file_id=file_id, - kodi_pathid=path_id, - parent_id=season_id, - checksum=api.checksum(), - view_id=viewid) self.kodi_db.modify_people(kodi_id, v.KODI_TYPE_EPISODE, api.people_list()) @@ -546,3 +531,13 @@ class Episode(ItemBase, TvShowMixin): userdata['PlayCount'], userdata['LastPlayedDate'], None) # Do send None - 2nd entry + self.plex_db.addReference(plex_id, + v.PLEX_TYPE_EPISODE, + kodi_id, + v.KODI_TYPE_EPISODE, + kodi_file_id=file_id, + kodi_pathid=path_id, + parent_id=season_id, + checksum=api.checksum(), + view_id=section_id, + last_sync=self.last_sync) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 249c07fa..20d2dc5d 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -2,8 +2,7 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, division, unicode_literals from logging import getLogger -import threading -import Queue +import time from . import common, process_metadata, sections from .get_metadata import GetMetadataTask @@ -11,10 +10,6 @@ from .. import utils, backgroundthread, playlists, variables as v, state from .. import plex_functions as PF, itemtypes LOG = getLogger('PLEX.library_sync.full_sync') -DOWNLOAD_QUEUE = Queue.Queue(maxsize=500) -PROCESS_QUEUE = Queue.Queue(maxsize=100) -FANARTQUEUE = Queue.Queue() -THREADS = [] def start(repair, callback): @@ -24,42 +19,51 @@ def start(repair, callback): FullSync(repair, callback).start() -class FullSync(threading.Thread, common.libsync_mixin): +class FullSync(backgroundthread.KillableThread, common.libsync_mixin): def __init__(self, repair, callback): """ repair=True: force sync EVERY item """ self.repair = repair self.callback = callback + self.queue = None + self.process_thread = None + self.last_sync = None + self.plex_db = None super(FullSync, self).__init__() - def process_item(self, xml_item, section): + def process_item(self, xml_item, get_children): """ Processes a single library item """ - plex_id = xml_item.get('ratingKey') - if plex_id is None: - # Skipping items 'title=All episodes' without a 'ratingKey' - return + plex_id = int(xml_item['ratingKey']) if self.new_items_only: if self.plex_db.check_plexid(plex_id) is None: backgroundthread.BGThreader.addTask( - GetMetadataTask().setup(PROCESS_QUEUE, + GetMetadataTask().setup(self.queue, plex_id, - section)) + get_children)) else: if self.plex_db.check_checksum( - 'K%s%s' % (plex_id, xml_item.get('updatedAt', ''))) is None: - pass + int('%s%s' % (xml_item['ratingKey'], + xml_item['updatedAt']))) is None: + backgroundthread.BGThreader.addTask( + GetMetadataTask().setup(self.queue, + plex_id, + get_children)) + else: + self.plex_db.update_last_sync(plex_id, self.last_sync) - def plex_movies(self): + @utils.log_time + def process_kind(self, kind): """ - Syncs movies + kind is a tuple: (, + kodi_type, + , + get_children) """ - LOG.debug('Processing Plex movies') - sections = (x for x in sections.SECTIONS - if x['kodi_type'] == v.KODI_TYPE_MOVIE) - self.queue = Queue.Queue(maxsize=200) + LOG.debug('Start processing %s', kind[0]) + sections = (x for x in sections.SECTIONS if x['kodi_type'] == kind[1]) for section in sections: LOG.debug('Processing library section %s', section) if self.isCanceled(): @@ -68,43 +72,21 @@ class FullSync(threading.Thread, common.libsync_mixin): state.PATH_VERIFIED = False try: iterator = PF.PlexSectionItems(section['id']) - t = process_metadata.ProcessMetadata( - self.queue, - itemtypes.Movie, - utils.cast(int, iterator.get('totalSize', 0))) - for xml_item in PF.plex_section_items_generator(section['id']): + # Tell the processing thread about this new section + queue_info = process_metadata.InitNewSection( + kind[2], + utils.cast(int, iterator.get('totalSize', 0)), + utils.cast(unicode, iterator.get('librarySectionTitle')), + section['id']) + self.queue.put(queue_info) + for xml_item in iterator: if self.isCanceled(): return False - self.process_item(xml_item, section) + self.process_item(xml_item, kind[3]) except RuntimeError: LOG.error('Could not entirely process section %s', section) - return False - - - - # Populate self.updatelist and self.all_plex_ids - self.get_updatelist(all_plexmovies, - item_class, - 'add_update', - view['name'], - view['id']) - self.process_updatelist(item_class) - # Update viewstate for EVERY item - sections = (x for x in sections.SECTIONS - if x['kodi_type'] == v.KODI_TYPE_MOVIE) - for view in sections: - if self.isCanceled(): - return False - self.plex_update_watched(view['id'], item_class) - - # PROCESS DELETES ##### - if not self.repair: - # Manual sync, process deletes - with itemtypes.Movies() as movie_db: - for kodimovie in self.all_kodi_ids: - if kodimovie not in self.all_plex_ids: - movie_db.remove(kodimovie) - LOG.info("%s sync is finished.", item_class) + continue + LOG.debug('Finished processing %s', kind[0]) return True def full_library_sync(self, new_items_only=False): @@ -113,29 +95,41 @@ class FullSync(threading.Thread, common.libsync_mixin): process = [self.plex_movies, self.plex_tv_show] if state.ENABLE_MUSIC: process.append(self.plex_music) + self.queue = backgroundthread.Queue.Queue(maxsize=200) + t = process_metadata.ProcessMetadata(self.queue, self.last_sync) + t.start() + kinds = [ + ('movies', v.KODI_TYPE_MOVIE, itemtypes.Movie, False), + ('tv shows', v.KODI_TYPE_SHOW, itemtypes.Show, False), + ('tv seasons', v.KODI_TYPE_SEASON, itemtypes.Season, False), + ('tv shows', v.KODI_TYPE_SHOW, itemtypes.Show, False), + ] + try: + for kind in kinds: + if self.isCanceled() or not self.process_kind(kind): + return False - # Do the processing - for kind in process: - if self.isCanceled() or not kind(): - return False + # Let kodi update the views in any case, since we're doing a full sync + common.update_kodi_library(video=True, music=state.ENABLE_MUSIC) - # Let kodi update the views in any case, since we're doing a full sync - common.update_kodi_library(video=True, music=state.ENABLE_MUSIC) - - if utils.window('plex_scancrashed') == 'true': - # Show warning if itemtypes.py crashed at some point - utils.messageDialog(utils.lang(29999), utils.lang(39408)) - utils.window('plex_scancrashed', clear=True) - elif utils.window('plex_scancrashed') == '401': - utils.window('plex_scancrashed', clear=True) - if state.PMS_STATUS not in ('401', 'Auth'): - # Plex server had too much and returned ERROR - utils.messageDialog(utils.lang(29999), utils.lang(39409)) + if utils.window('plex_scancrashed') == 'true': + # Show warning if itemtypes.py crashed at some point + utils.messageDialog(utils.lang(29999), utils.lang(39408)) + utils.window('plex_scancrashed', clear=True) + elif utils.window('plex_scancrashed') == '401': + utils.window('plex_scancrashed', clear=True) + if state.PMS_STATUS not in ('401', 'Auth'): + # Plex server had too much and returned ERROR + utils.messageDialog(utils.lang(29999), utils.lang(39409)) + finally: + # Last element will kill the processing thread + self.queue.put(None) return True @utils.log_time def run(self): successful = False + self.last_sync = time.time() try: if self.isCanceled(): return diff --git a/resources/lib/library_sync/process_metadata.py b/resources/lib/library_sync/process_metadata.py index 512dcf95..424c8b28 100644 --- a/resources/lib/library_sync/process_metadata.py +++ b/resources/lib/library_sync/process_metadata.py @@ -10,6 +10,21 @@ from .. import utils, backgroundthread LOG = getLogger('PLEX.library_sync.process_metadata') +class InitNewSection(object): + """ + Throw this into the queue used for ProcessMetadata to tell it which + Plex library section we're looking at + + context: itemtypes.Movie, itemtypes.Episode, etc. + """ + def __init__(self, context, total_number_of_items, section_name, + section_id): + self.context = context + self.total = total_number_of_items + self.name = section_name + self.id = section_id + + class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin): """ Not yet implemented for more than 1 thread - if ever. Only to be called by @@ -22,12 +37,13 @@ class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin): item_class: as used to call functions in itemtypes.py e.g. 'Movies' => itemtypes.Movies() """ - def __init__(self, queue, context, total_number_of_items): + def __init__(self, queue, last_sync): self.queue = queue - self.context = context - self.total = total_number_of_items + self.last_sync = last_sync + self.total = 0 self.current = 0 self.title = None + self.section_name = None super(ProcessMetadata, self).__init__() def update_dialog(self): @@ -38,7 +54,7 @@ class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin): except ZeroDivisionError: progress = 0 self.dialog.update(progress, - utils.lang(29999), + self.section_name, '%s/%s: %s' % (self.current, self.total, self.title)) @@ -49,32 +65,42 @@ class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin): LOG.debug('Processing thread started') self.dialog = xbmcgui.DialogProgressBG() self.dialog.create(utils.lang(39714)) - with self.context() as context: + try: + # Init with the very first library section. This will block! + section = self.queue.get() + self.queue.task_done() + if section is None: + return while self.isCanceled() is False: - # grabs item from queue - try: - xml = self.queue.get(block=False) - except backgroundthread.Queue.Empty: - xbmc.sleep(10) - continue - self.queue.task_done() - if xml is None: + if section is None: break - try: - if xml.children is not None: - context.add_update(xml[0], - viewtag=xml['view_name'], - viewid=xml['view_id'], - children=xml['children']) - else: - context.add_update(xml[0], - viewtag=xml['view_name'], - viewid=xml['view_id']) - except: - utils.ERROR(txt='process_metadata crashed', notify=True) - self.current += 1 - if self.current % 20 == 0: - self.title = utils.cast(unicode, xml[0].get('title')) - self.update_dialog() - self.dialog.close() - LOG.debug('Processing thread terminated') + self.total = section.total + self.section_name = section.name + with section.context(self.last_sync) as context: + while self.isCanceled() is False: + # grabs item from queue + try: + xml = self.queue.get(block=False) + except backgroundthread.Queue.Empty: + xbmc.sleep(20) + continue + self.queue.task_done() + if xml is InitNewSection or xml is None: + section = xml + break + try: + context.add_update(xml[0], + viewtag=section.name, + viewid=section.id, + children=xml.children) + except: + utils.ERROR(txt='process_metadata crashed', + notify=True) + self.current += 1 + if self.current % 20 == 0: + self.title = utils.cast(unicode, + xml[0].get('title')) + self.update_dialog() + finally: + self.dialog.close() + LOG.debug('Processing thread terminated') diff --git a/resources/lib/librarysync.py b/resources/lib/librarysync.py index 58946ad2..2f2139c0 100644 --- a/resources/lib/librarysync.py +++ b/resources/lib/librarysync.py @@ -224,7 +224,7 @@ class LibrarySync(Thread): kodi_fileid INTEGER, kodi_pathid INTEGER, parent_id INTEGER, - checksum INTEGER, + checksum INTEGER UNIQUE, fanart_synced INTEGER, last_sync INTEGER) ''') @@ -236,14 +236,11 @@ class LibrarySync(Thread): kodi_tagid INTEGER, sync_to_kodi INTEGER) ''') - plex_db.plexcursor.execute(''' - CREATE TABLE IF NOT EXISTS version(idVersion TEXT) - ''') plex_db.plexcursor.execute(''' CREATE TABLE IF NOT EXISTS playlists( - plex_id PRIMARY KEY, + plex_id INTEGER PRIMARY KEY ASC, plex_name TEXT, - plex_updatedat TEXT, + plex_updatedat INTEGER, kodi_path TEXT, kodi_type TEXT, kodi_hash TEXT) diff --git a/resources/lib/plex_api.py b/resources/lib/plex_api.py index 450c29df..6edf0e19 100644 --- a/resources/lib/plex_api.py +++ b/resources/lib/plex_api.py @@ -37,6 +37,7 @@ from urllib import urlencode, unquote, quote from urlparse import parse_qsl from xbmcgui import ListItem +from .utils import cast from .downloadutils import DownloadUtils as DU from . import clientinfo from . import utils @@ -103,27 +104,26 @@ class API(object): def updated_at(self): """ - Returns the last time this item was updated as unicode, e.g. - '1524739868', or None + Returns the last time this item was updated as an int, e.g. + 1524739868 or None """ - return self.item.get('updatedAt') + return cast(int, self.item.get('updatedAt')) def checksum(self): """ - Returns a string, not int. - WATCH OUT - time in Plex, not Kodi ;-) + Returns the unique int or None if this failes """ - # Include a letter to prohibit saving as an int! - return "K%s%s" % (self.plex_id(), self.item.get('updatedAt', '')) + try: + return cast(int, '%s%s' % (self.item.get('ratingKey'), + self.item.get('updatedAt'))) + except ValueError: + pass def plex_id(self): """ Returns the Plex ratingKey such as 246922 as an integer or None """ - try: - return int(self.item.get('ratingKey')) - except TypeError, ValueError: - pass + return cast(int, self.item.get('ratingKey')) def path(self, force_first_media=True, force_addon=False, direct_paths=None): diff --git a/resources/lib/plex_functions.py b/resources/lib/plex_functions.py index db4664ec..2c78e26b 100644 --- a/resources/lib/plex_functions.py +++ b/resources/lib/plex_functions.py @@ -544,7 +544,8 @@ class DownloadGen(object): def _download_chunk(self): args = { 'X-Plex-Container-Size': CONTAINERSIZE, - 'X-Plex-Container-Start': self._pos + 'X-Plex-Container-Start': self._pos, + 'sort': 'id' } self.xml = DU().downloadUrl(self._url, parameters=args) try: @@ -604,7 +605,8 @@ def DownloadChunks(url): while error_counter < 10: args = { 'X-Plex-Container-Size': CONTAINERSIZE, - 'X-Plex-Container-Start': pos + 'X-Plex-Container-Start': pos, + 'sort': 'id' } xmlpart = DU().downloadUrl(url + urlencode(args)) # If something went wrong - skip in the hope that it works next time diff --git a/resources/lib/plexdb_functions.py b/resources/lib/plexdb_functions.py index 2bf2d912..e1a0d1a4 100644 --- a/resources/lib/plexdb_functions.py +++ b/resources/lib/plexdb_functions.py @@ -214,6 +214,13 @@ class Plex_DB_Functions(): (checksum, )) return self.plexcursor.fetchone() + def update_last_sync(self, plex_id, last_sync): + """ + Fast method that updates Plex table with last_sync (an int) for plex_id + """ + self.plexcursor.execute('UPDATE plex SET last_sync = ? WHERE plex_id = ?', + (last_sync, plex_id, )) + def checksum(self, plex_type): """ Returns a list of tuples (plex_id, checksum) for plex_type @@ -228,19 +235,21 @@ class Plex_DB_Functions(): def addReference(self, plex_id, plex_type, kodi_id, kodi_type, kodi_fileid=None, kodi_pathid=None, parent_id=None, - checksum=None, view_id=None): + checksum=None, section_id=None, last_sync=None): """ Appends or replaces an entry into the plex table """ query = ''' INSERT OR REPLACE INTO plex( plex_id, kodi_id, kodi_fileid, kodi_pathid, plex_type, - kodi_type, parent_id, checksum, view_id, fanart_synced) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + kodi_type, parent_id, checksum, section_id, fanart_synced, + last_sync) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''' self.plexcursor.execute(query, (plex_id, kodi_id, kodi_fileid, kodi_pathid, plex_type, kodi_type, - parent_id, checksum, view_id, 0)) + parent_id, checksum, section_id, 0, + last_sync)) def updateReference(self, plex_id, checksum): """