diff --git a/resources/lib/db.py b/resources/lib/db.py index 30f91315..fc622ae5 100644 --- a/resources/lib/db.py +++ b/resources/lib/db.py @@ -74,6 +74,8 @@ def connect(media_type=None, wal_mode=True): """ if media_type == "plex": db_path = v.DB_PLEX_PATH + elif media_type == 'plex-copy': + db_path = v.DB_PLEX_COPY_PATH elif media_type == "music": db_path = v.DB_MUSIC_PATH elif media_type == "texture": diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py index c92f7a96..7ed361b0 100644 --- a/resources/lib/library_sync/fill_metadata_queue.py +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, division, unicode_literals from logging import getLogger -import Queue -from collections import deque from . import common from ..plex_db import PlexDB @@ -11,92 +9,43 @@ from .. import backgroundthread, app LOG = getLogger('PLEX.sync.fill_metadata_queue') -def batch_sizes(): - """ - Increase batch sizes in order to get download threads for an items xml - metadata started soon. Corresponds to batch sizes when downloading lists - of items from the PMS ('limitindex' in the PKC settings) - """ - for i in (50, 100, 200, 400): - yield i - while True: - yield 1000 - - class FillMetadataQueue(common.LibrarySyncMixin, backgroundthread.KillableThread, ): """ Threaded download of Plex XML metadata for a certain library item. - Fills the queue with the downloaded etree XML objects - - Input: - queue Queue.Queue() object where this thread will store - the downloaded metadata XMLs as etree objects + Fills the queue with the downloaded etree XML objects. Will use a COPIED + plex.db file (plex-copy.db) in order to read much faster without the + writing thread stalling """ def __init__(self, repair, section_queue, get_metadata_queue): self.repair = repair self.section_queue = section_queue self.get_metadata_queue = get_metadata_queue - self.count = 0 - self.batch_size = batch_sizes() super(FillMetadataQueue, self).__init__() - def _loop(self, section, items): - while items and not self.should_cancel(): - try: - with PlexDB(lock=False) as plexdb: - while items and not self.should_cancel(): - last, plex_id, checksum = items.popleft() - if (not self.repair and - plexdb.checksum(plex_id, section.plex_type) == checksum): - continue - if last: - # We might have received LESS items from the PMS - # than anticipated. Ensures that our queues finish - section.number_of_items = self.count + 1 - self.get_metadata_queue.put((self.count, plex_id, section), - block=False) - self.count += 1 - except Queue.Full: - # Close the DB for speed! - LOG.debug('Queue full') - self.sleep(5) - while not self.should_cancel(): - try: - self.get_metadata_queue.put((self.count, plex_id, section), - block=False) - except Queue.Full: - LOG.debug('Queue fuller') - self.sleep(2) - else: - self.count += 1 - break - def _process_section(self, section): # Initialize only once to avoid loosing the last value before we're # breaking the for loop - iterator = common.tag_last(section.iterator) - last = True - self.count = 0 - while not self.should_cancel(): - batch_size = next(self.batch_size) - LOG.debug('Process batch of size %s with count %s for section %s', - batch_size, self.count, section) - # Iterator will block for download - let's not do that when the - # DB connection is open - items = deque() - for i, (last, xml) in enumerate(iterator): + LOG.debug('Process section %s with %s items', + section, section.number_of_items) + count = 0 + with PlexDB(lock=False, copy=True) as plexdb: + for xml in section.iterator: + if self.should_cancel(): + break plex_id = int(xml.get('ratingKey')) checksum = int('{}{}'.format( plex_id, xml.get('updatedAt', xml.get('addedAt', '1541572987')))) - items.append((last, plex_id, checksum)) - if i == batch_size: - break - self._loop(section, items) - if last: - break + if (not self.repair and + plexdb.checksum(plex_id, section.plex_type) == checksum): + continue + self.get_metadata_queue.put((count, plex_id, section)) + count += 1 + # We might have received LESS items from the PMS than anticipated. + # Ensures that our queues finish + section.number_of_items = count def run(self): LOG.debug('Starting %s thread', self.__class__.__name__) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index d5706643..0312ec21 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -11,7 +11,7 @@ from .fill_metadata_queue import FillMetadataQueue from .process_metadata import ProcessMetadataThread from . import common, sections from .. import utils, timing, backgroundthread, variables as v, app -from .. import plex_functions as PF, itemtypes +from .. import plex_functions as PF, itemtypes, path_ops if common.PLAYLIST_SYNC_ENABLED: from .. import playlists @@ -77,6 +77,16 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): self.dialog.close() self.dialog = None + @staticmethod + def copy_plex_db(): + """ + Takes the current plex.db file and copies it to plex-copy.db + This will allow us to have "concurrent" connections during adding/ + updating items, increasing sync speed tremendously. + Using the same DB with e.g. WAL mode did not really work out... + """ + path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH) + @utils.log_time def processing_loop_new_and_changed_items(self): LOG.debug('Start working') @@ -267,6 +277,9 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): LOG.info('Running library sync with repair=%s', self.repair) try: self.run_full_library_sync() + except Exception: + utils.ERROR(notify=True) + self.successful = False finally: app.APP.deregister_thread(self) LOG.info('Library sync done. successful: %s', self.successful) @@ -277,9 +290,7 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): # Get latest Plex libraries and build playlist and video node files if self.should_cancel() or not sections.sync_from_pms(self): return - if self.should_cancel(): - self.successful = False - return + self.copy_plex_db() self.full_library_sync() finally: common.update_kodi_library(video=True, music=True) diff --git a/resources/lib/plex_db/common.py b/resources/lib/plex_db/common.py index f150fa38..6a70cb42 100644 --- a/resources/lib/plex_db/common.py +++ b/resources/lib/plex_db/common.py @@ -20,18 +20,19 @@ SUPPORTED_KODI_TYPES = ( class PlexDBBase(object): """ - Plex database methods used for all types of items + Plex database methods used for all types of items. """ - def __init__(self, plexconn=None, lock=True): + def __init__(self, plexconn=None, lock=True, copy=False): # Allows us to use this class with a cursor instead of context mgr self.plexconn = plexconn self.cursor = self.plexconn.cursor() if self.plexconn else None self.lock = lock + self.copy = copy def __enter__(self): if self.lock: PLEXDB_LOCK.acquire() - self.plexconn = db.connect('plex') + self.plexconn = db.connect('plex-copy' if self.copy else 'plex') self.cursor = self.plexconn.cursor() return self diff --git a/resources/lib/variables.py b/resources/lib/variables.py index b36ba29c..2ccc241d 100644 --- a/resources/lib/variables.py +++ b/resources/lib/variables.py @@ -127,6 +127,7 @@ DB_MUSIC_PATH = None DB_TEXTURE_VERSION = None DB_TEXTURE_PATH = None DB_PLEX_PATH = try_decode(xbmc.translatePath("special://database/plex.db")) +DB_PLEX_COPY_PATH = try_decode(xbmc.translatePath("special://database/plex-copy.db")) EXTERNAL_SUBTITLE_TEMP_PATH = try_decode(xbmc.translatePath( "special://profile/addon_data/%s/temp/" % ADDON_ID))