Copy entire plex.db to avoid db-locks entirely
This commit is contained in:
parent
a87dfa0a7a
commit
2744b9da7e
5 changed files with 40 additions and 76 deletions
|
@ -74,6 +74,8 @@ def connect(media_type=None, wal_mode=True):
|
||||||
"""
|
"""
|
||||||
if media_type == "plex":
|
if media_type == "plex":
|
||||||
db_path = v.DB_PLEX_PATH
|
db_path = v.DB_PLEX_PATH
|
||||||
|
elif media_type == 'plex-copy':
|
||||||
|
db_path = v.DB_PLEX_COPY_PATH
|
||||||
elif media_type == "music":
|
elif media_type == "music":
|
||||||
db_path = v.DB_MUSIC_PATH
|
db_path = v.DB_MUSIC_PATH
|
||||||
elif media_type == "texture":
|
elif media_type == "texture":
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
from __future__ import absolute_import, division, unicode_literals
|
from __future__ import absolute_import, division, unicode_literals
|
||||||
from logging import getLogger
|
from logging import getLogger
|
||||||
import Queue
|
|
||||||
from collections import deque
|
|
||||||
|
|
||||||
from . import common
|
from . import common
|
||||||
from ..plex_db import PlexDB
|
from ..plex_db import PlexDB
|
||||||
|
@ -11,92 +9,43 @@ from .. import backgroundthread, app
|
||||||
LOG = getLogger('PLEX.sync.fill_metadata_queue')
|
LOG = getLogger('PLEX.sync.fill_metadata_queue')
|
||||||
|
|
||||||
|
|
||||||
def batch_sizes():
|
|
||||||
"""
|
|
||||||
Increase batch sizes in order to get download threads for an items xml
|
|
||||||
metadata started soon. Corresponds to batch sizes when downloading lists
|
|
||||||
of items from the PMS ('limitindex' in the PKC settings)
|
|
||||||
"""
|
|
||||||
for i in (50, 100, 200, 400):
|
|
||||||
yield i
|
|
||||||
while True:
|
|
||||||
yield 1000
|
|
||||||
|
|
||||||
|
|
||||||
class FillMetadataQueue(common.LibrarySyncMixin,
|
class FillMetadataQueue(common.LibrarySyncMixin,
|
||||||
backgroundthread.KillableThread, ):
|
backgroundthread.KillableThread, ):
|
||||||
"""
|
"""
|
||||||
Threaded download of Plex XML metadata for a certain library item.
|
Threaded download of Plex XML metadata for a certain library item.
|
||||||
Fills the queue with the downloaded etree XML objects
|
Fills the queue with the downloaded etree XML objects. Will use a COPIED
|
||||||
|
plex.db file (plex-copy.db) in order to read much faster without the
|
||||||
Input:
|
writing thread stalling
|
||||||
queue Queue.Queue() object where this thread will store
|
|
||||||
the downloaded metadata XMLs as etree objects
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, repair, section_queue, get_metadata_queue):
|
def __init__(self, repair, section_queue, get_metadata_queue):
|
||||||
self.repair = repair
|
self.repair = repair
|
||||||
self.section_queue = section_queue
|
self.section_queue = section_queue
|
||||||
self.get_metadata_queue = get_metadata_queue
|
self.get_metadata_queue = get_metadata_queue
|
||||||
self.count = 0
|
|
||||||
self.batch_size = batch_sizes()
|
|
||||||
super(FillMetadataQueue, self).__init__()
|
super(FillMetadataQueue, self).__init__()
|
||||||
|
|
||||||
def _loop(self, section, items):
|
|
||||||
while items and not self.should_cancel():
|
|
||||||
try:
|
|
||||||
with PlexDB(lock=False) as plexdb:
|
|
||||||
while items and not self.should_cancel():
|
|
||||||
last, plex_id, checksum = items.popleft()
|
|
||||||
if (not self.repair and
|
|
||||||
plexdb.checksum(plex_id, section.plex_type) == checksum):
|
|
||||||
continue
|
|
||||||
if last:
|
|
||||||
# We might have received LESS items from the PMS
|
|
||||||
# than anticipated. Ensures that our queues finish
|
|
||||||
section.number_of_items = self.count + 1
|
|
||||||
self.get_metadata_queue.put((self.count, plex_id, section),
|
|
||||||
block=False)
|
|
||||||
self.count += 1
|
|
||||||
except Queue.Full:
|
|
||||||
# Close the DB for speed!
|
|
||||||
LOG.debug('Queue full')
|
|
||||||
self.sleep(5)
|
|
||||||
while not self.should_cancel():
|
|
||||||
try:
|
|
||||||
self.get_metadata_queue.put((self.count, plex_id, section),
|
|
||||||
block=False)
|
|
||||||
except Queue.Full:
|
|
||||||
LOG.debug('Queue fuller')
|
|
||||||
self.sleep(2)
|
|
||||||
else:
|
|
||||||
self.count += 1
|
|
||||||
break
|
|
||||||
|
|
||||||
def _process_section(self, section):
|
def _process_section(self, section):
|
||||||
# Initialize only once to avoid loosing the last value before we're
|
# Initialize only once to avoid loosing the last value before we're
|
||||||
# breaking the for loop
|
# breaking the for loop
|
||||||
iterator = common.tag_last(section.iterator)
|
LOG.debug('Process section %s with %s items',
|
||||||
last = True
|
section, section.number_of_items)
|
||||||
self.count = 0
|
count = 0
|
||||||
while not self.should_cancel():
|
with PlexDB(lock=False, copy=True) as plexdb:
|
||||||
batch_size = next(self.batch_size)
|
for xml in section.iterator:
|
||||||
LOG.debug('Process batch of size %s with count %s for section %s',
|
if self.should_cancel():
|
||||||
batch_size, self.count, section)
|
break
|
||||||
# Iterator will block for download - let's not do that when the
|
|
||||||
# DB connection is open
|
|
||||||
items = deque()
|
|
||||||
for i, (last, xml) in enumerate(iterator):
|
|
||||||
plex_id = int(xml.get('ratingKey'))
|
plex_id = int(xml.get('ratingKey'))
|
||||||
checksum = int('{}{}'.format(
|
checksum = int('{}{}'.format(
|
||||||
plex_id,
|
plex_id,
|
||||||
xml.get('updatedAt',
|
xml.get('updatedAt',
|
||||||
xml.get('addedAt', '1541572987'))))
|
xml.get('addedAt', '1541572987'))))
|
||||||
items.append((last, plex_id, checksum))
|
if (not self.repair and
|
||||||
if i == batch_size:
|
plexdb.checksum(plex_id, section.plex_type) == checksum):
|
||||||
break
|
continue
|
||||||
self._loop(section, items)
|
self.get_metadata_queue.put((count, plex_id, section))
|
||||||
if last:
|
count += 1
|
||||||
break
|
# We might have received LESS items from the PMS than anticipated.
|
||||||
|
# Ensures that our queues finish
|
||||||
|
section.number_of_items = count
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
LOG.debug('Starting %s thread', self.__class__.__name__)
|
LOG.debug('Starting %s thread', self.__class__.__name__)
|
||||||
|
|
|
@ -11,7 +11,7 @@ from .fill_metadata_queue import FillMetadataQueue
|
||||||
from .process_metadata import ProcessMetadataThread
|
from .process_metadata import ProcessMetadataThread
|
||||||
from . import common, sections
|
from . import common, sections
|
||||||
from .. import utils, timing, backgroundthread, variables as v, app
|
from .. import utils, timing, backgroundthread, variables as v, app
|
||||||
from .. import plex_functions as PF, itemtypes
|
from .. import plex_functions as PF, itemtypes, path_ops
|
||||||
|
|
||||||
if common.PLAYLIST_SYNC_ENABLED:
|
if common.PLAYLIST_SYNC_ENABLED:
|
||||||
from .. import playlists
|
from .. import playlists
|
||||||
|
@ -77,6 +77,16 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
|
||||||
self.dialog.close()
|
self.dialog.close()
|
||||||
self.dialog = None
|
self.dialog = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def copy_plex_db():
|
||||||
|
"""
|
||||||
|
Takes the current plex.db file and copies it to plex-copy.db
|
||||||
|
This will allow us to have "concurrent" connections during adding/
|
||||||
|
updating items, increasing sync speed tremendously.
|
||||||
|
Using the same DB with e.g. WAL mode did not really work out...
|
||||||
|
"""
|
||||||
|
path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH)
|
||||||
|
|
||||||
@utils.log_time
|
@utils.log_time
|
||||||
def processing_loop_new_and_changed_items(self):
|
def processing_loop_new_and_changed_items(self):
|
||||||
LOG.debug('Start working')
|
LOG.debug('Start working')
|
||||||
|
@ -267,6 +277,9 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
|
||||||
LOG.info('Running library sync with repair=%s', self.repair)
|
LOG.info('Running library sync with repair=%s', self.repair)
|
||||||
try:
|
try:
|
||||||
self.run_full_library_sync()
|
self.run_full_library_sync()
|
||||||
|
except Exception:
|
||||||
|
utils.ERROR(notify=True)
|
||||||
|
self.successful = False
|
||||||
finally:
|
finally:
|
||||||
app.APP.deregister_thread(self)
|
app.APP.deregister_thread(self)
|
||||||
LOG.info('Library sync done. successful: %s', self.successful)
|
LOG.info('Library sync done. successful: %s', self.successful)
|
||||||
|
@ -277,9 +290,7 @@ class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
|
||||||
# Get latest Plex libraries and build playlist and video node files
|
# Get latest Plex libraries and build playlist and video node files
|
||||||
if self.should_cancel() or not sections.sync_from_pms(self):
|
if self.should_cancel() or not sections.sync_from_pms(self):
|
||||||
return
|
return
|
||||||
if self.should_cancel():
|
self.copy_plex_db()
|
||||||
self.successful = False
|
|
||||||
return
|
|
||||||
self.full_library_sync()
|
self.full_library_sync()
|
||||||
finally:
|
finally:
|
||||||
common.update_kodi_library(video=True, music=True)
|
common.update_kodi_library(video=True, music=True)
|
||||||
|
|
|
@ -20,18 +20,19 @@ SUPPORTED_KODI_TYPES = (
|
||||||
|
|
||||||
class PlexDBBase(object):
|
class PlexDBBase(object):
|
||||||
"""
|
"""
|
||||||
Plex database methods used for all types of items
|
Plex database methods used for all types of items.
|
||||||
"""
|
"""
|
||||||
def __init__(self, plexconn=None, lock=True):
|
def __init__(self, plexconn=None, lock=True, copy=False):
|
||||||
# Allows us to use this class with a cursor instead of context mgr
|
# Allows us to use this class with a cursor instead of context mgr
|
||||||
self.plexconn = plexconn
|
self.plexconn = plexconn
|
||||||
self.cursor = self.plexconn.cursor() if self.plexconn else None
|
self.cursor = self.plexconn.cursor() if self.plexconn else None
|
||||||
self.lock = lock
|
self.lock = lock
|
||||||
|
self.copy = copy
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
if self.lock:
|
if self.lock:
|
||||||
PLEXDB_LOCK.acquire()
|
PLEXDB_LOCK.acquire()
|
||||||
self.plexconn = db.connect('plex')
|
self.plexconn = db.connect('plex-copy' if self.copy else 'plex')
|
||||||
self.cursor = self.plexconn.cursor()
|
self.cursor = self.plexconn.cursor()
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
|
|
@ -127,6 +127,7 @@ DB_MUSIC_PATH = None
|
||||||
DB_TEXTURE_VERSION = None
|
DB_TEXTURE_VERSION = None
|
||||||
DB_TEXTURE_PATH = None
|
DB_TEXTURE_PATH = None
|
||||||
DB_PLEX_PATH = try_decode(xbmc.translatePath("special://database/plex.db"))
|
DB_PLEX_PATH = try_decode(xbmc.translatePath("special://database/plex.db"))
|
||||||
|
DB_PLEX_COPY_PATH = try_decode(xbmc.translatePath("special://database/plex-copy.db"))
|
||||||
|
|
||||||
EXTERNAL_SUBTITLE_TEMP_PATH = try_decode(xbmc.translatePath(
|
EXTERNAL_SUBTITLE_TEMP_PATH = try_decode(xbmc.translatePath(
|
||||||
"special://profile/addon_data/%s/temp/" % ADDON_ID))
|
"special://profile/addon_data/%s/temp/" % ADDON_ID))
|
||||||
|
|
Loading…
Reference in a new issue