Merge pull request #1085 from croneter/optimize-db

Optimize the new sync process and fix some bugs that were introduced
This commit is contained in:
croneter 2019-12-13 16:54:48 +01:00 committed by GitHub
commit fd80bc9cf3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 110 additions and 148 deletions

View file

@ -6,6 +6,7 @@ from functools import wraps
from . import variables as v, app from . import variables as v, app
DB_WRITE_ATTEMPTS = 100 DB_WRITE_ATTEMPTS = 100
DB_CONNECTION_TIMEOUT = 10
class LockedDatabase(Exception): class LockedDatabase(Exception):
@ -52,26 +53,21 @@ def catch_operationalerrors(method):
return wrapper return wrapper
def _initial_db_connection_setup(conn, wal_mode): def _initial_db_connection_setup(conn):
""" """
Set-up DB e.g. for WAL journal mode, if that hasn't already been done Set-up DB e.g. for WAL journal mode, if that hasn't already been done
before. Also start a transaction before. Also start a transaction
""" """
if wal_mode: conn.execute('PRAGMA journal_mode = WAL;')
pass conn.execute('PRAGMA cache_size = -8000;')
# conn.execute('PRAGMA journal_mode=WAL;') conn.execute('PRAGMA synchronous = NORMAL;')
# conn.execute('PRAGMA cache_size = -8000;')
# conn.execute('PRAGMA synchronous=NORMAL;')
conn.execute('BEGIN') conn.execute('BEGIN')
def connect(media_type=None, wal_mode=True): def connect(media_type=None):
""" """
Open a connection to the Kodi database. Open a connection to the Kodi database.
media_type: 'video' (standard if not passed), 'plex', 'music', 'texture' media_type: 'video' (standard if not passed), 'plex', 'music', 'texture'
Pass wal_mode=False if you want the standard (and slower) sqlite
journal_mode, e.g. when wiping entire tables. Useful if you do NOT want
concurrent access to DB for both PKC and Kodi
""" """
if media_type == "plex": if media_type == "plex":
db_path = v.DB_PLEX_PATH db_path = v.DB_PLEX_PATH
@ -83,11 +79,13 @@ def connect(media_type=None, wal_mode=True):
db_path = v.DB_TEXTURE_PATH db_path = v.DB_TEXTURE_PATH
else: else:
db_path = v.DB_VIDEO_PATH db_path = v.DB_VIDEO_PATH
conn = sqlite3.connect(db_path, timeout=30.0) conn = sqlite3.connect(db_path,
timeout=DB_CONNECTION_TIMEOUT,
isolation_level=None)
attempts = DB_WRITE_ATTEMPTS attempts = DB_WRITE_ATTEMPTS
while True: while True:
try: try:
_initial_db_connection_setup(conn, wal_mode) _initial_db_connection_setup(conn)
except sqlite3.OperationalError as err: except sqlite3.OperationalError as err:
if 'database is locked' not in err: if 'database is locked' not in err:
# Not an error we want to catch, so reraise it # Not an error we want to catch, so reraise it
@ -98,7 +96,7 @@ def connect(media_type=None, wal_mode=True):
raise LockedDatabase('Database is locked') raise LockedDatabase('Database is locked')
if app.APP.monitor.waitForAbort(0.05): if app.APP.monitor.waitForAbort(0.05):
# PKC needs to quit # PKC needs to quit
return raise LockedDatabase('Database was locked and we need to exit')
else: else:
break break
return conn return conn

View file

@ -62,7 +62,7 @@ def setup_kodi_default_entries():
def reset_cached_images(): def reset_cached_images():
LOG.info('Resetting cached artwork') LOG.info('Resetting cached artwork')
LOG.debug('Resetting the Kodi texture DB') LOG.debug('Resetting the Kodi texture DB')
with KodiTextureDB(wal_mode=False) as kodidb: with KodiTextureDB() as kodidb:
kodidb.wipe() kodidb.wipe()
LOG.debug('Deleting all cached image files') LOG.debug('Deleting all cached image files')
path = path_ops.translate_path('special://thumbnails/') path = path_ops.translate_path('special://thumbnails/')
@ -91,11 +91,11 @@ def wipe_dbs(music=True):
""" """
LOG.warn('Wiping Kodi databases!') LOG.warn('Wiping Kodi databases!')
LOG.info('Wiping Kodi video database') LOG.info('Wiping Kodi video database')
with KodiVideoDB(wal_mode=False) as kodidb: with KodiVideoDB() as kodidb:
kodidb.wipe() kodidb.wipe()
if music: if music:
LOG.info('Wiping Kodi music database') LOG.info('Wiping Kodi music database')
with KodiMusicDB(wal_mode=False) as kodidb: with KodiMusicDB() as kodidb:
kodidb.wipe() kodidb.wipe()
reset_cached_images() reset_cached_images()
setup_kodi_default_entries() setup_kodi_default_entries()

View file

@ -15,11 +15,9 @@ class KodiDBBase(object):
Kodi database methods used for all types of items Kodi database methods used for all types of items
""" """
def __init__(self, texture_db=False, kodiconn=None, artconn=None, def __init__(self, texture_db=False, kodiconn=None, artconn=None,
lock=True, wal_mode=True): lock=True):
""" """
Allows direct use with a cursor instead of context mgr Allows direct use with a cursor instead of context mgr
Pass wal_mode=False if you want the standard sqlite journal_mode, e.g.
when wiping entire tables
""" """
self._texture_db = texture_db self._texture_db = texture_db
self.lock = lock self.lock = lock
@ -27,14 +25,13 @@ class KodiDBBase(object):
self.cursor = self.kodiconn.cursor() if self.kodiconn else None self.cursor = self.kodiconn.cursor() if self.kodiconn else None
self.artconn = artconn self.artconn = artconn
self.artcursor = self.artconn.cursor() if self.artconn else None self.artcursor = self.artconn.cursor() if self.artconn else None
self.wal_mode = wal_mode
def __enter__(self): def __enter__(self):
if self.lock: if self.lock:
KODIDB_LOCK.acquire() KODIDB_LOCK.acquire()
self.kodiconn = db.connect(self.db_kind, self.wal_mode) self.kodiconn = db.connect(self.db_kind)
self.cursor = self.kodiconn.cursor() self.cursor = self.kodiconn.cursor()
self.artconn = db.connect('texture', self.wal_mode) if self._texture_db \ self.artconn = db.connect('texture') if self._texture_db \
else None else None
self.artcursor = self.artconn.cursor() if self._texture_db else None self.artcursor = self.artconn.cursor() if self._texture_db else None
return self return self

View file

@ -38,7 +38,8 @@ class KodiVideoDB(common.KodiDBBase):
For some reason, Kodi ignores this if done via itemtypes while e.g. For some reason, Kodi ignores this if done via itemtypes while e.g.
adding or updating items. (addPath method does NOT work) adding or updating items. (addPath method does NOT work)
""" """
path_id = self.get_path(MOVIE_PATH) for path, kind in ((MOVIE_PATH, 'movies'), (SHOW_PATH, 'tvshows')):
path_id = self.get_path(path)
if path_id is None: if path_id is None:
query = ''' query = '''
INSERT INTO path(strPath, INSERT INTO path(strPath,
@ -48,24 +49,8 @@ class KodiVideoDB(common.KodiDBBase):
exclude) exclude)
VALUES (?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?)
''' '''
self.cursor.execute(query, (MOVIE_PATH, self.cursor.execute(query, (path,
'movies', kind,
'metadata.local',
1,
0))
# And TV shows
path_id = self.get_path(SHOW_PATH)
if path_id is None:
query = '''
INSERT INTO path(strPath,
strContent,
strScraper,
noUpdate,
exclude)
VALUES (?, ?, ?, ?, ?)
'''
self.cursor.execute(query, (SHOW_PATH,
'tvshows',
'metadata.local', 'metadata.local',
1, 1,
0)) 0))

View file

@ -1,10 +1,13 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
import xbmc import xbmc
from .. import utils, app, variables as v from .. import utils, app, variables as v
LOG = getLogger('PLEX.sync')
PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and
utils.settings('enablePlaylistSync') == 'true') utils.settings('enablePlaylistSync') == 'true')
@ -22,6 +25,18 @@ class LibrarySyncMixin(object):
""" """
return self.should_cancel() return self.should_cancel()
def run(self):
app.APP.register_thread(self)
LOG.debug('##===--- Starting %s ---===##', self.__class__.__name__)
try:
self._run()
except Exception as err:
LOG.error('Exception encountered: %s', err)
utils.ERROR(notify=True)
finally:
app.APP.deregister_thread(self)
LOG.debug('##===--- %s Stopped ---===##', self.__class__.__name__)
def update_kodi_library(video=True, music=True): def update_kodi_library(video=True, music=True):
""" """

View file

@ -1,21 +1,23 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from logging import getLogger from logging import getLogger
from Queue import Empty
from . import common from . import common
from ..plex_db import PlexDB from ..plex_db import PlexDB
from .. import backgroundthread, app from .. import backgroundthread
LOG = getLogger('PLEX.sync.fill_metadata_queue') LOG = getLogger('PLEX.sync.fill_metadata_queue')
QUEUE_TIMEOUT = 10 # seconds
class FillMetadataQueue(common.LibrarySyncMixin, class FillMetadataQueue(common.LibrarySyncMixin,
backgroundthread.KillableThread, ): backgroundthread.KillableThread):
""" """
Threaded download of Plex XML metadata for a certain library item. Determines which plex_ids we need to sync and puts these ids in a separate
Fills the queue with the downloaded etree XML objects. Will use a COPIED queue. Will use a COPIED plex.db file (plex-copy.db) in order to read much
plex.db file (plex-copy.db) in order to read much faster without the faster without the writing thread stalling
writing thread stalling
""" """
def __init__(self, repair, section_queue, get_metadata_queue): def __init__(self, repair, section_queue, get_metadata_queue):
self.repair = repair self.repair = repair
@ -41,27 +43,25 @@ class FillMetadataQueue(common.LibrarySyncMixin,
if (not self.repair and if (not self.repair and
plexdb.checksum(plex_id, section.plex_type) == checksum): plexdb.checksum(plex_id, section.plex_type) == checksum):
continue continue
self.get_metadata_queue.put((count, plex_id, section)) try:
self.get_metadata_queue.put((count, plex_id, section),
timeout=QUEUE_TIMEOUT)
except Empty:
LOG.error('Putting %s in get_metadata_queue timed out - '
'aborting sync now', plex_id)
section.sync_successful = False
break
count += 1 count += 1
# We might have received LESS items from the PMS than anticipated. # We might have received LESS items from the PMS than anticipated.
# Ensures that our queues finish # Ensures that our queues finish
section.number_of_items = count section.number_of_items = count
def run(self): def _run(self):
LOG.debug('Starting %s thread', self.__class__.__name__)
app.APP.register_thread(self)
try:
while not self.should_cancel(): while not self.should_cancel():
section = self.section_queue.get() section = self.section_queue.get()
self.section_queue.task_done() self.section_queue.task_done()
if section is None: if section is None:
break break
self._process_section(section) self._process_section(section)
except Exception:
from .. import utils
utils.ERROR(notify=True)
finally:
# Signal the download metadata threads to stop with a sentinel # Signal the download metadata threads to stop with a sentinel
self.get_metadata_queue.put(None) self.get_metadata_queue.put(None)
app.APP.deregister_thread(self)
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)

View file

@ -18,12 +18,13 @@ if common.PLAYLIST_SYNC_ENABLED:
LOG = getLogger('PLEX.sync.full_sync') LOG = getLogger('PLEX.sync.full_sync')
# How many items will be put through the processing chain at once? DELETION_BATCH_SIZE = 250
BATCH_SIZE = 250 PLAYSTATE_BATCH_SIZE = 5000
# Size of queue for xmls to be downloaded from PMS for/and before processing
QUEUE_BUFFER = 50 # Max. number of plex_ids held in memory for later processing
BACKLOG_QUEUE_SIZE = 10000
# Max number of xmls held in memory # Max number of xmls held in memory
MAX_QUEUE_SIZE = 500 XML_QUEUE_SIZE = 500
# Safety margin to filter PMS items - how many seconds to look into the past? # Safety margin to filter PMS items - how many seconds to look into the past?
UPDATED_AT_SAFETY = 60 * 5 UPDATED_AT_SAFETY = 60 * 5
LAST_VIEWED_AT_SAFETY = 60 * 5 LAST_VIEWED_AT_SAFETY = 60 * 5
@ -46,8 +47,8 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
self.dialog = None self.dialog = None
self.section_queue = Queue.Queue() self.section_queue = Queue.Queue()
self.get_metadata_queue = Queue.Queue(maxsize=5000) self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
self.processing_queue = backgroundthread.ProcessingQueue(maxsize=500) self.processing_queue = backgroundthread.ProcessingQueue(maxsize=XML_QUEUE_SIZE)
self.current_time = timing.plex_now() self.current_time = timing.plex_now()
self.last_section = sections.Section() self.last_section = sections.Section()
@ -123,35 +124,29 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
LOG.debug('Processing %s playstates for library section %s', LOG.debug('Processing %s playstates for library section %s',
section.number_of_items, section) section.number_of_items, section)
try: try:
iterator = section.iterator with section.context(self.current_time) as context:
iterator = common.tag_last(iterator) for xml in section.iterator:
last = True
while not self.should_cancel():
with section.context(self.current_time) as itemtype:
for last, xml_item in iterator:
section.count += 1 section.count += 1
if not itemtype.update_userdata(xml_item, section.plex_type): if not context.update_userdata(xml, section.plex_type):
# Somehow did not sync this item yet # Somehow did not sync this item yet
itemtype.add_update(xml_item, context.add_update(xml,
section_name=section.name, section_name=section.name,
section_id=section.section_id) section_id=section.section_id)
itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']), context.plexdb.update_last_sync(int(xml.attrib['ratingKey']),
section.plex_type, section.plex_type,
self.current_time) self.current_time)
self.update_progressbar(section, '', section.count) self.update_progressbar(section, '', section.count - 1)
if section.count % (10 * BATCH_SIZE) == 0: if section.count % PLAYSTATE_BATCH_SIZE == 0:
break context.commit()
if last:
break
except RuntimeError: except RuntimeError:
LOG.error('Could not entirely process section %s', section) LOG.error('Could not entirely process section %s', section)
self.successful = False self.successful = False
def get_generators(self, kinds, queue, all_items): def threaded_get_generators(self, kinds, queue, all_items):
""" """
Getting iterators is costly, so let's do it asynchronously Getting iterators is costly, so let's do it in a dedicated thread
""" """
LOG.debug('Start get_generators') LOG.debug('Start threaded_get_generators')
try: try:
for kind in kinds: for kind in kinds:
for section in (x for x in app.SYNC.sections for section in (x for x in app.SYNC.sections
@ -189,7 +184,7 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
utils.ERROR(notify=True) utils.ERROR(notify=True)
finally: finally:
queue.put(None) queue.put(None)
LOG.debug('Exiting get_generators') LOG.debug('Exiting threaded_get_generators')
def full_library_sync(self): def full_library_sync(self):
kinds = [ kinds = [
@ -205,7 +200,10 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
]) ])
# ADD NEW ITEMS # ADD NEW ITEMS
# We need to enforce syncing e.g. show before season before episode # We need to enforce syncing e.g. show before season before episode
self.get_generators(kinds, self.section_queue, False) thread = backgroundthread.KillableThread(
target=self.threaded_get_generators,
args=(kinds, self.section_queue, False))
thread.start()
# Do the heavy lifting # Do the heavy lifting
self.processing_loop_new_and_changed_items() self.processing_loop_new_and_changed_items()
common.update_kodi_library(video=True, music=True) common.update_kodi_library(video=True, music=True)
@ -237,7 +235,10 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
# Close the progress indicator dialog # Close the progress indicator dialog
self.dialog.close() self.dialog.close()
self.dialog = None self.dialog = None
self.get_generators(kinds, self.section_queue, True) thread = backgroundthread.KillableThread(
target=self.threaded_get_generators,
args=(kinds, self.section_queue, True))
thread.start()
self.processing_loop_playstates() self.processing_loop_playstates()
if self.should_cancel() or not self.successful: if self.should_cancel() or not self.successful:
return return
@ -263,29 +264,17 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
plex_ids = list( plex_ids = list(
ctx.plexdb.plex_id_by_last_sync(plex_type, ctx.plexdb.plex_id_by_last_sync(plex_type,
self.current_time, self.current_time,
BATCH_SIZE)) DELETION_BATCH_SIZE))
for plex_id in plex_ids: for plex_id in plex_ids:
if self.should_cancel(): if self.should_cancel():
return return
ctx.remove(plex_id, plex_type) ctx.remove(plex_id, plex_type)
if len(plex_ids) < BATCH_SIZE: if len(plex_ids) < DELETION_BATCH_SIZE:
break break
LOG.debug('Done looking for items to delete') LOG.debug('Done looking for items to delete')
def run(self):
app.APP.register_thread(self)
LOG.info('Running library sync with repair=%s', self.repair)
try:
self.run_full_library_sync()
except Exception:
utils.ERROR(notify=True)
self.successful = False
finally:
app.APP.deregister_thread(self)
LOG.info('Library sync done. successful: %s', self.successful)
@utils.log_time @utils.log_time
def run_full_library_sync(self): def _run(self):
try: try:
# Get latest Plex libraries and build playlist and video node files # Get latest Plex libraries and build playlist and video node files
if self.should_cancel() or not sections.sync_from_pms(self): if self.should_cancel() or not sections.sync_from_pms(self):

View file

@ -5,7 +5,6 @@ from logging import getLogger
from . import common from . import common
from ..plex_api import API from ..plex_api import API
from .. import backgroundthread, plex_functions as PF, utils, variables as v from .. import backgroundthread, plex_functions as PF, utils, variables as v
from .. import app
LOG = getLogger('PLEX.sync.get_metadata') LOG = getLogger('PLEX.sync.get_metadata')
LOCK = backgroundthread.threading.Lock() LOCK = backgroundthread.threading.Lock()
@ -69,15 +68,6 @@ class GetMetadataThread(common.LibrarySyncMixin,
# Add a "dummy" item so we're not skipping a beat # Add a "dummy" item so we're not skipping a beat
self.processing_queue.put((count, {'section': section, 'xml': None})) self.processing_queue.put((count, {'section': section, 'xml': None}))
def run(self):
LOG.debug('Starting %s thread', self.__class__.__name__)
app.APP.register_thread(self)
try:
self._run()
finally:
app.APP.deregister_thread(self)
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
def _run(self): def _run(self):
while True: while True:
item = self.get_metadata_queue.get() item = self.get_metadata_queue.get()

View file

@ -57,18 +57,6 @@ class ProcessMetadataThread(common.LibrarySyncMixin,
self.processing_queue.task_done() self.processing_queue.task_done()
return item return item
def run(self):
LOG.debug('Starting %s thread', self.__class__.__name__)
app.APP.register_thread(self)
try:
self._run()
except Exception:
from .. import utils
utils.ERROR(notify=True)
finally:
app.APP.deregister_thread(self)
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
def _run(self): def _run(self):
# There are 2 sentinels: None for aborting/ending this thread, the dict # There are 2 sentinels: None for aborting/ending this thread, the dict
# {'section': section, 'xml': None} for skipped/invalid items # {'section': section, 'xml': None} for skipped/invalid items