Rewire library sync
This commit is contained in:
parent
f6415ae444
commit
d33ba7e502
8 changed files with 268 additions and 214 deletions
|
@ -513,6 +513,11 @@ msgctxt "#30522"
|
||||||
msgid "Force transcode h265/HEVC"
|
msgid "Force transcode h265/HEVC"
|
||||||
msgstr ""
|
msgstr ""
|
||||||
|
|
||||||
|
# PKC Settings - Sync Options
|
||||||
|
msgctxt "#30523"
|
||||||
|
msgid "Also show sync progress for playstate and user data"
|
||||||
|
msgstr ""
|
||||||
|
|
||||||
# PKC Settings - Playback
|
# PKC Settings - Playback
|
||||||
msgctxt "#30527"
|
msgctxt "#30527"
|
||||||
msgid "Ignore specials in next episodes"
|
msgid "Ignore specials in next episodes"
|
||||||
|
|
|
@ -476,6 +476,8 @@ class KodiMusicDB(common.KodiDBBase):
|
||||||
elif kodi_type == v.KODI_TYPE_ALBUM:
|
elif kodi_type == v.KODI_TYPE_ALBUM:
|
||||||
column = 'iUserrating'
|
column = 'iUserrating'
|
||||||
identifier = 'idAlbum'
|
identifier = 'idAlbum'
|
||||||
|
else:
|
||||||
|
return
|
||||||
self.cursor.execute('''UPDATE %s SET %s = ? WHERE ? = ?'''
|
self.cursor.execute('''UPDATE %s SET %s = ? WHERE ? = ?'''
|
||||||
% (kodi_type, column),
|
% (kodi_type, column),
|
||||||
(userrating, identifier, kodi_id))
|
(userrating, identifier, kodi_id))
|
||||||
|
|
|
@ -5,13 +5,13 @@ from logging import getLogger
|
||||||
import Queue
|
import Queue
|
||||||
import copy
|
import copy
|
||||||
|
|
||||||
|
import xbmcgui
|
||||||
|
|
||||||
from cProfile import Profile
|
from cProfile import Profile
|
||||||
from pstats import Stats
|
from pstats import Stats
|
||||||
from StringIO import StringIO
|
from StringIO import StringIO
|
||||||
|
|
||||||
from .get_metadata import GetMetadataTask, reset_collections
|
from .get_metadata import GetMetadataTask, reset_collections
|
||||||
from .process_metadata import InitNewSection, UpdateLastSyncAndPlaystate, \
|
|
||||||
ProcessMetadata, DeleteItem
|
|
||||||
from . import common, sections
|
from . import common, sections
|
||||||
from .. import utils, timing, backgroundthread, variables as v, app
|
from .. import utils, timing, backgroundthread, variables as v, app
|
||||||
from .. import plex_functions as PF, itemtypes
|
from .. import plex_functions as PF, itemtypes
|
||||||
|
@ -26,6 +26,45 @@ else:
|
||||||
PLAYLIST_SYNC_ENABLED = False
|
PLAYLIST_SYNC_ENABLED = False
|
||||||
|
|
||||||
LOG = getLogger('PLEX.sync.full_sync')
|
LOG = getLogger('PLEX.sync.full_sync')
|
||||||
|
# How many items will be put through the processing chain at once?
|
||||||
|
BATCH_SIZE = 2000
|
||||||
|
# Safety margin to filter PMS items - how many seconds to look into the past?
|
||||||
|
UPDATED_AT_SAFETY = 60 * 5
|
||||||
|
LAST_VIEWED_AT_SAFETY = 60 * 5
|
||||||
|
|
||||||
|
|
||||||
|
def tag_last(iterable):
|
||||||
|
"""
|
||||||
|
Given some iterable, returns (last, item), where last is only True if you
|
||||||
|
are on the final iteration.
|
||||||
|
"""
|
||||||
|
iterator = iter(iterable)
|
||||||
|
gotone = False
|
||||||
|
try:
|
||||||
|
lookback = next(iterator)
|
||||||
|
gotone = True
|
||||||
|
while True:
|
||||||
|
cur = next(iterator)
|
||||||
|
yield False, lookback
|
||||||
|
lookback = cur
|
||||||
|
except StopIteration:
|
||||||
|
if gotone:
|
||||||
|
yield True, lookback
|
||||||
|
raise StopIteration()
|
||||||
|
|
||||||
|
|
||||||
|
class InitNewSection(object):
|
||||||
|
"""
|
||||||
|
Throw this into the queue used for ProcessMetadata to tell it which
|
||||||
|
Plex library section we're looking at
|
||||||
|
"""
|
||||||
|
def __init__(self, context, total_number_of_items, section_name,
|
||||||
|
section_id, plex_type):
|
||||||
|
self.context = context
|
||||||
|
self.total = total_number_of_items
|
||||||
|
self.name = section_name
|
||||||
|
self.id = section_id
|
||||||
|
self.plex_type = plex_type
|
||||||
|
|
||||||
|
|
||||||
class FullSync(common.libsync_mixin):
|
class FullSync(common.libsync_mixin):
|
||||||
|
@ -36,19 +75,41 @@ class FullSync(common.libsync_mixin):
|
||||||
self._canceled = False
|
self._canceled = False
|
||||||
self.repair = repair
|
self.repair = repair
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
self.show_dialog = show_dialog
|
|
||||||
self.queue = None
|
self.queue = None
|
||||||
self.process_thread = None
|
self.process_thread = None
|
||||||
self.current_sync = None
|
self.current_sync = None
|
||||||
self.plexdb = None
|
self.plexdb = None
|
||||||
self.plex_type = None
|
self.plex_type = None
|
||||||
self.section_type = None
|
self.section_type = None
|
||||||
self.processing_thread = None
|
self.worker_count = int(utils.settings('syncThreadNumber'))
|
||||||
|
self.item_count = 0
|
||||||
|
# For progress dialog
|
||||||
|
self.show_dialog = show_dialog
|
||||||
|
self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true'
|
||||||
|
self.dialog = None
|
||||||
|
self.total = 0
|
||||||
|
self.current = 1
|
||||||
|
self.processed = 0
|
||||||
|
self.title = ''
|
||||||
|
self.section = None
|
||||||
|
self.section_name = None
|
||||||
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
||||||
self.threader = backgroundthread.ThreaderManager(
|
self.threader = backgroundthread.ThreaderManager(
|
||||||
worker=backgroundthread.NonstoppingBackgroundWorker)
|
worker=backgroundthread.NonstoppingBackgroundWorker,
|
||||||
|
worker_count=self.worker_count)
|
||||||
super(FullSync, self).__init__()
|
super(FullSync, self).__init__()
|
||||||
|
|
||||||
|
def update_progressbar(self):
|
||||||
|
if self.show_dialog:
|
||||||
|
try:
|
||||||
|
progress = int(float(self.current) / float(self.total) * 100.0)
|
||||||
|
except ZeroDivisionError:
|
||||||
|
progress = 0
|
||||||
|
self.dialog.update(progress,
|
||||||
|
'%s (%s)' % (self.section_name, self.section_type_text),
|
||||||
|
'%s/%s %s'
|
||||||
|
% (self.current, self.total, self.title))
|
||||||
|
|
||||||
def process_item(self, xml_item):
|
def process_item(self, xml_item):
|
||||||
"""
|
"""
|
||||||
Processes a single library item
|
Processes a single library item
|
||||||
|
@ -58,30 +119,87 @@ class FullSync(common.libsync_mixin):
|
||||||
int('%s%s' % (plex_id,
|
int('%s%s' % (plex_id,
|
||||||
xml_item.get('updatedAt',
|
xml_item.get('updatedAt',
|
||||||
xml_item.get('addedAt', 1541572987)))):
|
xml_item.get('addedAt', 1541572987)))):
|
||||||
# Already got EXACTLY this item in our DB. BUT need to collect all
|
|
||||||
# DB updates within the same thread
|
|
||||||
self.queue.put(UpdateLastSyncAndPlaystate(plex_id, xml_item))
|
|
||||||
return
|
return
|
||||||
task = GetMetadataTask()
|
self.threader.addTask(GetMetadataTask(self.queue,
|
||||||
task.setup(self.queue, plex_id, self.plex_type, self.get_children)
|
plex_id,
|
||||||
self.threader.addTask(task)
|
self.plex_type,
|
||||||
|
self.get_children))
|
||||||
|
self.item_count += 1
|
||||||
|
|
||||||
def process_delete(self):
|
def process_playstate(self, xml_item):
|
||||||
"""
|
"""
|
||||||
Removes all the items that have NOT been updated (last_sync timestamp
|
Processes the playstate of a single library item
|
||||||
is different)
|
|
||||||
"""
|
"""
|
||||||
for plex_id in self.plexdb.plex_id_by_last_sync(self.plex_type,
|
plex_id = int(xml_item.get('ratingKey'))
|
||||||
self.current_sync):
|
if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \
|
||||||
if self.isCanceled():
|
int('%s%s' % (plex_id,
|
||||||
|
xml_item.get('updatedAt',
|
||||||
|
xml_item.get('addedAt', 1541572987)))):
|
||||||
return
|
return
|
||||||
self.queue.put(DeleteItem(plex_id))
|
self.threader.addTask(GetMetadataTask(self.queue,
|
||||||
|
plex_id,
|
||||||
|
self.plex_type,
|
||||||
|
self.get_children))
|
||||||
|
self.item_count += 1
|
||||||
|
|
||||||
|
def update_library(self):
|
||||||
|
LOG.debug('Writing changes to Kodi library now')
|
||||||
|
i = 0
|
||||||
|
if not self.section:
|
||||||
|
self.section = self.queue.get()
|
||||||
|
self.queue.task_done()
|
||||||
|
while not self.isCanceled() and self.item_count > 0:
|
||||||
|
section = self.section
|
||||||
|
if not section:
|
||||||
|
break
|
||||||
|
LOG.debug('Start or continue processing section %s (%ss)',
|
||||||
|
section.name, section.plex_type)
|
||||||
|
self.current = 1
|
||||||
|
self.processed = 0
|
||||||
|
self.total = section.total
|
||||||
|
self.section_name = section.name
|
||||||
|
self.section_type_text = utils.lang(
|
||||||
|
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
|
||||||
|
with section.context(self.current_sync) as context:
|
||||||
|
while not self.isCanceled() and self.item_count > 0:
|
||||||
|
try:
|
||||||
|
item = self.queue.get(block=False)
|
||||||
|
except backgroundthread.Queue.Empty:
|
||||||
|
if self.threader.threader.working():
|
||||||
|
app.APP.monitor.waitForAbort(0.02)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# Try again, in case a thread just finished
|
||||||
|
i += 1
|
||||||
|
if i == 3:
|
||||||
|
break
|
||||||
|
continue
|
||||||
|
i = 0
|
||||||
|
self.queue.task_done()
|
||||||
|
if isinstance(item, dict):
|
||||||
|
context.add_update(item['xml'][0],
|
||||||
|
section_name=section.name,
|
||||||
|
section_id=section.id,
|
||||||
|
children=item['children'])
|
||||||
|
self.title = item['xml'][0].get('title')
|
||||||
|
self.processed += 1
|
||||||
|
elif isinstance(item, InitNewSection) or item is None:
|
||||||
|
self.section = item
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
raise ValueError('Unknown type %s' % type(item))
|
||||||
|
self.item_count -= 1
|
||||||
|
self.update_progressbar()
|
||||||
|
self.current += 1
|
||||||
|
if self.processed == 500:
|
||||||
|
self.processed = 0
|
||||||
|
context.commit()
|
||||||
|
LOG.debug('Done writing changes to Kodi library')
|
||||||
|
|
||||||
@utils.log_time
|
@utils.log_time
|
||||||
def process_section(self, section):
|
def addupdate_section(self, section):
|
||||||
LOG.debug('Processing library section %s', section)
|
LOG.debug('Processing library section for new or changed items %s',
|
||||||
if self.isCanceled():
|
section)
|
||||||
return False
|
|
||||||
if not self.install_sync_done:
|
if not self.install_sync_done:
|
||||||
app.SYNC.path_verified = False
|
app.SYNC.path_verified = False
|
||||||
try:
|
try:
|
||||||
|
@ -94,45 +212,75 @@ class FullSync(common.libsync_mixin):
|
||||||
section['section_id'],
|
section['section_id'],
|
||||||
section['plex_type'])
|
section['plex_type'])
|
||||||
self.queue.put(queue_info)
|
self.queue.put(queue_info)
|
||||||
|
last = True
|
||||||
|
# To keep track of the item-number in order to kill while loops
|
||||||
|
self.item_count = 0
|
||||||
|
while True:
|
||||||
|
# Check Plex DB to see what we need to add/update
|
||||||
with PlexDB() as self.plexdb:
|
with PlexDB() as self.plexdb:
|
||||||
for xml_item in iterator:
|
for i, (last, xml_item) in enumerate(tag_last(iterator)):
|
||||||
if self.isCanceled():
|
if self.isCanceled():
|
||||||
return False
|
return False
|
||||||
self.process_item(xml_item)
|
self.process_item(xml_item)
|
||||||
|
if self.item_count == BATCH_SIZE:
|
||||||
|
break
|
||||||
|
# Make sure Plex DB above is closed before adding/updating
|
||||||
|
if self.item_count == BATCH_SIZE:
|
||||||
|
self.update_library()
|
||||||
|
if last:
|
||||||
|
break
|
||||||
|
self.update_library()
|
||||||
|
reset_collections()
|
||||||
|
return True
|
||||||
except RuntimeError:
|
except RuntimeError:
|
||||||
LOG.error('Could not entirely process section %s', section)
|
LOG.error('Could not entirely process section %s', section)
|
||||||
return False
|
return False
|
||||||
LOG.debug('Waiting for download threads to finish')
|
|
||||||
while self.threader.threader.working():
|
@utils.log_time
|
||||||
app.APP.monitor.waitForAbort(0.1)
|
def playstate_per_section(self, section):
|
||||||
reset_collections()
|
LOG.debug('Processing playstate for library section %s', section)
|
||||||
try:
|
try:
|
||||||
# Tell the processing thread that we're syncing playstate
|
# Sync new, updated and deleted items
|
||||||
|
iterator = section['iterator']
|
||||||
|
# Tell the processing thread about this new section
|
||||||
queue_info = InitNewSection(section['context'],
|
queue_info = InitNewSection(section['context'],
|
||||||
iterator.total,
|
iterator.total,
|
||||||
iterator.get('librarySectionTitle'),
|
section['section_name'],
|
||||||
section['section_id'],
|
section['section_id'],
|
||||||
section['plex_type'])
|
section['plex_type'])
|
||||||
self.queue.put(queue_info)
|
self.queue.put(queue_info)
|
||||||
LOG.debug('Waiting for processing thread to finish section')
|
self.total = iterator.total
|
||||||
# Make sure that the processing thread commits all changes
|
self.section_name = section['section_name']
|
||||||
self.queue.join()
|
self.section_type_text = utils.lang(
|
||||||
with PlexDB() as self.plexdb:
|
v.TRANSLATION_FROM_PLEXTYPE[section['plex_type']])
|
||||||
# Delete movies that are not on Plex anymore
|
self.current = 0
|
||||||
LOG.debug('Look for items to delete')
|
with section['context'](self.current_sync) as itemtype:
|
||||||
self.process_delete()
|
for xml_item in iterator:
|
||||||
# Wait again till the processing thread is done
|
if self.isCanceled():
|
||||||
self.queue.join()
|
|
||||||
except RuntimeError:
|
|
||||||
LOG.error('Could not process playstate for section %s', section)
|
|
||||||
return False
|
return False
|
||||||
LOG.debug('Done processing playstate for section')
|
itemtype.update_userdata(xml_item, section['plex_type'])
|
||||||
|
itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']),
|
||||||
|
section['plex_type'],
|
||||||
|
self.current_sync)
|
||||||
|
self.current += 1
|
||||||
|
self.update_progressbar()
|
||||||
return True
|
return True
|
||||||
|
except RuntimeError:
|
||||||
|
LOG.error('Could not entirely process section %s', section)
|
||||||
|
return False
|
||||||
|
|
||||||
def threaded_get_iterators(self, kinds, queue):
|
def threaded_get_iterators(self, kinds, queue, updated_at=None,
|
||||||
|
last_viewed_at=None):
|
||||||
"""
|
"""
|
||||||
PF.SectionItems is costly, so let's do it asynchronous
|
PF.SectionItems is costly, so let's do it asynchronous
|
||||||
"""
|
"""
|
||||||
|
if self.repair:
|
||||||
|
updated_at = None
|
||||||
|
last_viewed_at = None
|
||||||
|
else:
|
||||||
|
updated_at = updated_at - UPDATED_AT_SAFETY if updated_at else None
|
||||||
|
last_viewed_at = last_viewed_at - LAST_VIEWED_AT_SAFETY \
|
||||||
|
if last_viewed_at else None
|
||||||
try:
|
try:
|
||||||
for kind in kinds:
|
for kind in kinds:
|
||||||
for section in (x for x in sections.SECTIONS
|
for section in (x for x in sections.SECTIONS
|
||||||
|
@ -146,7 +294,9 @@ class FullSync(common.libsync_mixin):
|
||||||
element['context'] = kind[2]
|
element['context'] = kind[2]
|
||||||
element['get_children'] = kind[3]
|
element['get_children'] = kind[3]
|
||||||
element['iterator'] = PF.SectionItems(section['section_id'],
|
element['iterator'] = PF.SectionItems(section['section_id'],
|
||||||
plex_type=kind[0])
|
plex_type=kind[0],
|
||||||
|
updated_at=updated_at,
|
||||||
|
last_viewed_at=last_viewed_at)
|
||||||
queue.put(element)
|
queue.put(element)
|
||||||
finally:
|
finally:
|
||||||
queue.put(None)
|
queue.put(None)
|
||||||
|
@ -165,16 +315,24 @@ class FullSync(common.libsync_mixin):
|
||||||
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False),
|
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False),
|
||||||
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True),
|
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True),
|
||||||
])
|
])
|
||||||
|
# ADD NEW ITEMS
|
||||||
# Already start setting up the iterators. We need to enforce
|
# Already start setting up the iterators. We need to enforce
|
||||||
# syncing e.g. show before season before episode
|
# syncing e.g. show before season before episode
|
||||||
|
if not self.show_dialog_userdata and self.dialog:
|
||||||
|
# Close the progress indicator dialog
|
||||||
|
self.dialog.close()
|
||||||
|
self.dialog = None
|
||||||
iterator_queue = Queue.Queue()
|
iterator_queue = Queue.Queue()
|
||||||
|
updated_at = int(utils.settings('lastfullsync')) or None
|
||||||
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
|
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
|
||||||
None,
|
None,
|
||||||
kinds,
|
kinds,
|
||||||
iterator_queue)
|
iterator_queue,
|
||||||
|
updated_at=updated_at)
|
||||||
backgroundthread.BGThreader.addTask(task)
|
backgroundthread.BGThreader.addTask(task)
|
||||||
while True:
|
while True:
|
||||||
section = iterator_queue.get()
|
section = iterator_queue.get()
|
||||||
|
iterator_queue.task_done()
|
||||||
if section is None:
|
if section is None:
|
||||||
break
|
break
|
||||||
# Setup our variables
|
# Setup our variables
|
||||||
|
@ -183,53 +341,76 @@ class FullSync(common.libsync_mixin):
|
||||||
self.context = section['context']
|
self.context = section['context']
|
||||||
self.get_children = section['get_children']
|
self.get_children = section['get_children']
|
||||||
# Now do the heavy lifting
|
# Now do the heavy lifting
|
||||||
if self.isCanceled() or not self.process_section(section):
|
if self.isCanceled() or not self.addupdate_section(section):
|
||||||
return False
|
return False
|
||||||
|
# SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that
|
||||||
|
# were set to unwatched). Also mark all items on the PMS to be able
|
||||||
|
# to delete the ones still in Kodi
|
||||||
|
LOG.info('Start synching playstate and userdata for every item')
|
||||||
|
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
|
||||||
|
None,
|
||||||
|
kinds,
|
||||||
|
iterator_queue)
|
||||||
|
backgroundthread.BGThreader.addTask(task)
|
||||||
|
while True:
|
||||||
|
section = iterator_queue.get()
|
||||||
iterator_queue.task_done()
|
iterator_queue.task_done()
|
||||||
|
if section is None:
|
||||||
|
break
|
||||||
|
# Setup our variables
|
||||||
|
self.plex_type = section['plex_type']
|
||||||
|
self.section_type = section['section_type']
|
||||||
|
self.context = section['context']
|
||||||
|
self.get_children = section['get_children']
|
||||||
|
# Now do the heavy lifting
|
||||||
|
if self.isCanceled() or not self.playstate_per_section(section):
|
||||||
|
return False
|
||||||
|
# Delete movies that are not on Plex anymore
|
||||||
|
LOG.info('Looking for items to delete')
|
||||||
|
with section['context'](self.current_sync) as context:
|
||||||
|
for plex_id in context.plexdb.plex_id_by_last_sync(self.plex_type,
|
||||||
|
self.current_sync):
|
||||||
|
if self.isCanceled():
|
||||||
|
return False
|
||||||
|
context.remove(plex_id, self.plex_type)
|
||||||
|
LOG.debug('Done deleting')
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@utils.log_time
|
@utils.log_time
|
||||||
def run(self):
|
def run(self):
|
||||||
profile = Profile()
|
profile = Profile()
|
||||||
profile.enable()
|
profile.enable()
|
||||||
if self.isCanceled():
|
self.current_sync = timing.plex_now()
|
||||||
return
|
|
||||||
successful = False
|
|
||||||
self.current_sync = timing.unix_timestamp()
|
|
||||||
# Delete playlist and video node files from Kodi
|
# Delete playlist and video node files from Kodi
|
||||||
utils.delete_playlists()
|
utils.delete_playlists()
|
||||||
utils.delete_nodes()
|
utils.delete_nodes()
|
||||||
# Get latest Plex libraries and build playlist and video node files
|
# Get latest Plex libraries and build playlist and video node files
|
||||||
if not sections.sync_from_pms():
|
if not sections.sync_from_pms():
|
||||||
return
|
return
|
||||||
|
successful = False
|
||||||
try:
|
try:
|
||||||
# Fire up our single processing thread
|
self.queue = backgroundthread.Queue.Queue()
|
||||||
self.queue = backgroundthread.Queue.Queue(maxsize=1000)
|
if self.show_dialog:
|
||||||
self.processing_thread = ProcessMetadata(self.queue,
|
self.dialog = xbmcgui.DialogProgressBG()
|
||||||
self.current_sync,
|
self.dialog.create(utils.lang(39714))
|
||||||
self.show_dialog)
|
|
||||||
self.processing_thread.start()
|
|
||||||
|
|
||||||
# Actual syncing - do only new items first
|
# Actual syncing - do only new items first
|
||||||
LOG.info('Running full_library_sync with repair=%s',
|
LOG.info('Running full_library_sync with repair=%s',
|
||||||
self.repair)
|
self.repair)
|
||||||
if not self.full_library_sync():
|
if not self.full_library_sync():
|
||||||
return
|
return
|
||||||
# Tell the processing thread to exit with one last element None
|
|
||||||
self.queue.put(None)
|
|
||||||
if self.isCanceled():
|
if self.isCanceled():
|
||||||
return
|
return
|
||||||
if PLAYLIST_SYNC_ENABLED and not playlists.full_sync():
|
if PLAYLIST_SYNC_ENABLED and not playlists.full_sync():
|
||||||
return
|
return
|
||||||
successful = True
|
successful = True
|
||||||
except:
|
|
||||||
utils.ERROR(txt='full_sync.py crashed', notify=True)
|
|
||||||
finally:
|
finally:
|
||||||
# This will block until the processing thread really exits
|
|
||||||
LOG.debug('Waiting for processing thread to exit')
|
|
||||||
self.processing_thread.join()
|
|
||||||
common.update_kodi_library(video=True, music=True)
|
common.update_kodi_library(video=True, music=True)
|
||||||
|
if self.dialog:
|
||||||
|
self.dialog.close()
|
||||||
self.threader.shutdown()
|
self.threader.shutdown()
|
||||||
|
if successful:
|
||||||
|
utils.settings('lastfullsync', value=str(int(self.current_sync)))
|
||||||
if self.callback:
|
if self.callback:
|
||||||
self.callback(successful)
|
self.callback(successful)
|
||||||
LOG.info('Done full_sync')
|
LOG.info('Done full_sync')
|
||||||
|
|
|
@ -36,11 +36,12 @@ class GetMetadataTask(common.libsync_mixin, backgroundthread.Task):
|
||||||
queue Queue.Queue() object where this thread will store
|
queue Queue.Queue() object where this thread will store
|
||||||
the downloaded metadata XMLs as etree objects
|
the downloaded metadata XMLs as etree objects
|
||||||
"""
|
"""
|
||||||
def setup(self, queue, plex_id, plex_type, get_children=False):
|
def __init__(self, queue, plex_id, plex_type, get_children=False):
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
self.plex_id = plex_id
|
self.plex_id = plex_id
|
||||||
self.plex_type = plex_type
|
self.plex_type = plex_type
|
||||||
self.get_children = get_children
|
self.get_children = get_children
|
||||||
|
super(GetMetadataTask, self).__init__()
|
||||||
|
|
||||||
def _collections(self, item):
|
def _collections(self, item):
|
||||||
global COLLECTION_MATCH, COLLECTION_XMLS
|
global COLLECTION_MATCH, COLLECTION_XMLS
|
||||||
|
|
|
@ -1,146 +0,0 @@
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
from __future__ import absolute_import, division, unicode_literals
|
|
||||||
from logging import getLogger
|
|
||||||
import xbmcgui
|
|
||||||
|
|
||||||
from cProfile import Profile
|
|
||||||
from pstats import Stats
|
|
||||||
from StringIO import StringIO
|
|
||||||
|
|
||||||
from . import common
|
|
||||||
from .. import backgroundthread, utils, variables as v
|
|
||||||
|
|
||||||
LOG = getLogger('PLEX.sync.process_metadata')
|
|
||||||
|
|
||||||
|
|
||||||
class InitNewSection(object):
|
|
||||||
"""
|
|
||||||
Throw this into the queue used for ProcessMetadata to tell it which
|
|
||||||
Plex library section we're looking at
|
|
||||||
"""
|
|
||||||
def __init__(self, context, total_number_of_items, section_name,
|
|
||||||
section_id, plex_type):
|
|
||||||
self.context = context
|
|
||||||
self.total = total_number_of_items
|
|
||||||
self.name = section_name
|
|
||||||
self.id = section_id
|
|
||||||
self.plex_type = plex_type
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateLastSyncAndPlaystate(object):
|
|
||||||
def __init__(self, plex_id, xml_item):
|
|
||||||
self.plex_id = plex_id
|
|
||||||
self.xml_item = xml_item
|
|
||||||
|
|
||||||
|
|
||||||
class DeleteItem(object):
|
|
||||||
def __init__(self, plex_id):
|
|
||||||
self.plex_id = plex_id
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread):
|
|
||||||
"""
|
|
||||||
Not yet implemented for more than 1 thread - if ever. Only to be called by
|
|
||||||
ONE thread!
|
|
||||||
Processes the XML metadata in the queue
|
|
||||||
"""
|
|
||||||
def __init__(self, queue, last_sync, show_dialog):
|
|
||||||
self._canceled = False
|
|
||||||
self.queue = queue
|
|
||||||
self.last_sync = last_sync
|
|
||||||
self.show_dialog = show_dialog
|
|
||||||
self.total = 0
|
|
||||||
self.current = 1
|
|
||||||
self.processed = 0
|
|
||||||
self.title = ''
|
|
||||||
self.section_name = None
|
|
||||||
self.dialog = None
|
|
||||||
super(ProcessMetadata, self).__init__()
|
|
||||||
|
|
||||||
def update_progressbar(self):
|
|
||||||
if self.show_dialog:
|
|
||||||
try:
|
|
||||||
progress = int(float(self.current) / float(self.total) * 100.0)
|
|
||||||
except ZeroDivisionError:
|
|
||||||
progress = 0
|
|
||||||
self.dialog.update(progress,
|
|
||||||
'%s (%s)' % (self.section_name, self.section_type_text),
|
|
||||||
'%s/%s %s'
|
|
||||||
% (self.current, self.total, self.title))
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
LOG.debug('Processing thread started')
|
|
||||||
if self.show_dialog:
|
|
||||||
self.dialog = xbmcgui.DialogProgressBG()
|
|
||||||
self.dialog.create(utils.lang(39714))
|
|
||||||
try:
|
|
||||||
self._run()
|
|
||||||
except:
|
|
||||||
utils.ERROR(notify=True, cancel_sync=True)
|
|
||||||
finally:
|
|
||||||
if self.dialog:
|
|
||||||
self.dialog.close()
|
|
||||||
while not self.queue.empty():
|
|
||||||
# We need to empty the queue to let full_sync finish join()
|
|
||||||
self.queue.get()
|
|
||||||
self.queue.task_done()
|
|
||||||
LOG.debug('Processing thread terminated')
|
|
||||||
|
|
||||||
def _run(self):
|
|
||||||
"""
|
|
||||||
Do the work
|
|
||||||
"""
|
|
||||||
# Init with the very first library section. This will block!
|
|
||||||
section = self.queue.get()
|
|
||||||
self.queue.task_done()
|
|
||||||
if section is None:
|
|
||||||
return
|
|
||||||
while not self.isCanceled():
|
|
||||||
if section is None:
|
|
||||||
break
|
|
||||||
LOG.debug('Start processing section %s (%ss)',
|
|
||||||
section.name, section.plex_type)
|
|
||||||
self.current = 1
|
|
||||||
self.processed = 0
|
|
||||||
self.total = section.total
|
|
||||||
self.section_name = section.name
|
|
||||||
self.section_type_text = utils.lang(
|
|
||||||
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
|
|
||||||
profile = Profile()
|
|
||||||
profile.enable()
|
|
||||||
with section.context(self.last_sync) as context:
|
|
||||||
while not self.isCanceled():
|
|
||||||
# grabs item from queue. This will block!
|
|
||||||
item = self.queue.get()
|
|
||||||
if isinstance(item, dict):
|
|
||||||
context.add_update(item['xml'][0],
|
|
||||||
section_name=section.name,
|
|
||||||
section_id=section.id,
|
|
||||||
children=item['children'])
|
|
||||||
self.title = item['xml'][0].get('title')
|
|
||||||
self.processed += 1
|
|
||||||
elif isinstance(item, UpdateLastSyncAndPlaystate):
|
|
||||||
context.plexdb.update_last_sync(item.plex_id,
|
|
||||||
section.plex_type,
|
|
||||||
self.last_sync)
|
|
||||||
if section.plex_type != v.PLEX_TYPE_ARTIST:
|
|
||||||
context.update_userdata(item.xml_item,
|
|
||||||
section.plex_type)
|
|
||||||
elif isinstance(item, InitNewSection) or item is None:
|
|
||||||
section = item
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
context.remove(item.plex_id, plex_type=section.plex_type)
|
|
||||||
self.update_progressbar()
|
|
||||||
self.current += 1
|
|
||||||
if self.processed == 500:
|
|
||||||
self.processed = 0
|
|
||||||
context.commit()
|
|
||||||
self.queue.task_done()
|
|
||||||
self.queue.task_done()
|
|
||||||
profile.disable()
|
|
||||||
string_io = StringIO()
|
|
||||||
stats = Stats(profile, stream=string_io).sort_stats('cumulative')
|
|
||||||
stats.print_stats()
|
|
||||||
LOG.info('cProfile result: ')
|
|
||||||
LOG.info(string_io.getvalue())
|
|
|
@ -46,6 +46,14 @@ def plex_date_to_kodi(plex_timestamp):
|
||||||
localtime(float(plex_timestamp) + KODI_PLEX_TIME_OFFSET))
|
localtime(float(plex_timestamp) + KODI_PLEX_TIME_OFFSET))
|
||||||
|
|
||||||
|
|
||||||
|
def kodi_date_to_plex(kodi_timestamp):
|
||||||
|
return float(kodi_timestamp) - KODI_PLEX_TIME_OFFSET
|
||||||
|
|
||||||
|
|
||||||
|
def plex_now():
|
||||||
|
return kodi_date_to_plex(unix_timestamp())
|
||||||
|
|
||||||
|
|
||||||
def kodi_timestamp(plex_timestamp):
|
def kodi_timestamp(plex_timestamp):
|
||||||
return unix_date_to_kodi(plex_timestamp)
|
return unix_date_to_kodi(plex_timestamp)
|
||||||
|
|
||||||
|
|
|
@ -477,6 +477,7 @@ def wipe_database():
|
||||||
kodi_db.reset_cached_images()
|
kodi_db.reset_cached_images()
|
||||||
# reset the install run flag
|
# reset the install run flag
|
||||||
settings('SyncInstallRunDone', value="false")
|
settings('SyncInstallRunDone', value="false")
|
||||||
|
settings('lastfullsync', value="0")
|
||||||
LOG.info('Wiping done')
|
LOG.info('Wiping done')
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,7 @@
|
||||||
<setting id="fullSyncInterval" type="number" label="39053" default="60" option="int" />
|
<setting id="fullSyncInterval" type="number" label="39053" default="60" option="int" />
|
||||||
<setting id="dbSyncScreensaver" type="bool" label="39062" default="false" /><!--Sync when screensaver is deactivated-->
|
<setting id="dbSyncScreensaver" type="bool" label="39062" default="false" /><!--Sync when screensaver is deactivated-->
|
||||||
<setting id="dbSyncIndicator" label="30507" type="bool" default="true" /><!-- show syncing progress -->
|
<setting id="dbSyncIndicator" label="30507" type="bool" default="true" /><!-- show syncing progress -->
|
||||||
|
<setting id="playstate_sync_indicator" label="30523" type="bool" default="false" visible="eq(-1,true)" subsetting="true"/><!-- Also show sync progress for playstate and user data -->
|
||||||
<setting id="syncThreadNumber" type="slider" label="39003" default="10" option="int" range="1,1,30"/><!-- Number of simultaneous download threads -->
|
<setting id="syncThreadNumber" type="slider" label="39003" default="10" option="int" range="1,1,30"/><!-- Number of simultaneous download threads -->
|
||||||
<setting id="limitindex" type="slider" label="30515" default="200" option="int" range="50,50,1000"/><!-- Maximum items to request from the server at once -->
|
<setting id="limitindex" type="slider" label="30515" default="200" option="int" range="50,50,1000"/><!-- Maximum items to request from the server at once -->
|
||||||
<setting type="lsep" label="$LOCALIZE[136]" /><!-- Playlists -->
|
<setting type="lsep" label="$LOCALIZE[136]" /><!-- Playlists -->
|
||||||
|
@ -82,6 +83,7 @@
|
||||||
<setting id="themoviedbAPIKey" type="text" default="19c90103adb9e98f2172c6a6a3d85dc4" visible="false"/>
|
<setting id="themoviedbAPIKey" type="text" default="19c90103adb9e98f2172c6a6a3d85dc4" visible="false"/>
|
||||||
<setting id="FanArtTVAPIKey" type="text" default="639191cb0774661597f28a47e7e2bad5" visible="false"/>
|
<setting id="FanArtTVAPIKey" type="text" default="639191cb0774661597f28a47e7e2bad5" visible="false"/>
|
||||||
<setting id="syncEmptyShows" type="bool" label="30508" default="false" visible="false"/>
|
<setting id="syncEmptyShows" type="bool" label="30508" default="false" visible="false"/>
|
||||||
|
<setting id="lastfullsync" type="number" label="Time stamp when last successful full sync was conducted" default="0" visible="false" />
|
||||||
</category>
|
</category>
|
||||||
|
|
||||||
<category label="39057"><!-- Customize Paths -->
|
<category label="39057"><!-- Customize Paths -->
|
||||||
|
|
Loading…
Reference in a new issue