Rewire llibrary sync, part 2

This commit is contained in:
croneter 2018-10-21 12:03:21 +02:00
parent e6692a9012
commit 3f4c43e373
8 changed files with 184 additions and 160 deletions

View file

@ -44,7 +44,8 @@ class ItemBase(object):
Input: Input:
kodiType: optional argument; e.g. 'video' or 'music' kodiType: optional argument; e.g. 'video' or 'music'
""" """
def __init__(self, plex_db=None, kodi_db=None): def __init__(self, last_sync, plex_db=None, kodi_db=None):
self.last_sync = last_sync
self.artwork = artwork.Artwork() self.artwork = artwork.Artwork()
self.plexconn = None self.plexconn = None
self.plexcursor = plex_db.plexcursor if plex_db else None self.plexcursor = plex_db.plexcursor if plex_db else None

View file

@ -134,7 +134,8 @@ class Show(ItemBase, TvShowMixin):
""" """
For Plex library-type TV shows For Plex library-type TV shows
""" """
def add_update(self, xml, viewtag=None, viewid=None): def add_update(self, xml, section_name=None, section_id=None,
children=None):
""" """
Process a single show Process a single show
""" """
@ -199,15 +200,6 @@ class Show(ItemBase, TvShowMixin):
if update_item: if update_item:
LOG.info("UPDATE tvshow plex_id: %s - Title: %s", LOG.info("UPDATE tvshow plex_id: %s - Title: %s",
plex_id, api.title()) plex_id, api.title())
# Add reference is idempotent; the call here updates also fileid
# and path_id when item is moved or renamed
self.plex_db.addReference(plex_id,
v.PLEX_TYPE_SHOW,
kodi_id,
v.KODI_TYPE_SHOW,
kodi_pathid=path_id,
checksum=api.checksum(),
view_id=viewid)
# update new ratings Kodi 17 # update new ratings Kodi 17
rating_id = self.kodi_db.get_ratingid(kodi_id, v.KODI_TYPE_SHOW) rating_id = self.kodi_db.get_ratingid(kodi_id, v.KODI_TYPE_SHOW)
self.kodi_db.update_ratings(kodi_id, self.kodi_db.update_ratings(kodi_id,
@ -248,13 +240,7 @@ class Show(ItemBase, TvShowMixin):
query = "INSERT INTO tvshowlinkpath(idShow, idPath) values (?, ?)" query = "INSERT INTO tvshowlinkpath(idShow, idPath) values (?, ?)"
self.kodicursor.execute(query, (kodi_id, path_id)) self.kodicursor.execute(query, (kodi_id, path_id))
# Create the reference in plex table # Create the reference in plex table
self.plex_db.addReference(plex_id,
v.PLEX_TYPE_SHOW,
kodi_id,
v.KODI_TYPE_SHOW,
kodi_pathid=path_id,
checksum=api.checksum(),
view_id=viewid)
rating_id = self.kodi_db.get_ratingid(kodi_id, v.KODI_TYPE_SHOW) rating_id = self.kodi_db.get_ratingid(kodi_id, v.KODI_TYPE_SHOW)
self.kodi_db.add_ratings(rating_id, self.kodi_db.add_ratings(rating_id,
kodi_id, kodi_id,
@ -295,13 +281,22 @@ class Show(ItemBase, TvShowMixin):
# Process studios # Process studios
self.kodi_db.modify_studios(kodi_id, v.KODI_TYPE_SHOW, studios) self.kodi_db.modify_studios(kodi_id, v.KODI_TYPE_SHOW, studios)
# Process tags: view, PMS collection tags # Process tags: view, PMS collection tags
tags = [viewtag] tags = [section_name]
tags.extend([i for _, i in api.collection_list()]) tags.extend([i for _, i in api.collection_list()])
self.kodi_db.modify_tags(kodi_id, v.KODI_TYPE_SHOW, tags) self.kodi_db.modify_tags(kodi_id, v.KODI_TYPE_SHOW, tags)
self.plex_db.addReference(plex_id,
v.PLEX_TYPE_SHOW,
kodi_id,
v.KODI_TYPE_SHOW,
kodi_pathid=path_id,
checksum=api.checksum(),
view_id=section_id,
last_sync=self.last_sync)
class Season(ItemBase, TvShowMixin): class Season(ItemBase, TvShowMixin):
def add_update(self, xml, viewtag=None, viewid=None): def add_update(self, xml, section_name=None, section_id=None,
children=None):
""" """
Process a single season of a certain tv show Process a single season of a certain tv show
""" """
@ -334,12 +329,14 @@ class Season(ItemBase, TvShowMixin):
kodi_id, kodi_id,
v.KODI_TYPE_SEASON, v.KODI_TYPE_SEASON,
parent_id=show_id, parent_id=show_id,
view_id=viewid, view_id=section_id,
checksum=api.checksum()) checksum=api.checksum(),
last_sync=self.last_sync)
class Episode(ItemBase, TvShowMixin): class Episode(ItemBase, TvShowMixin):
def add_update(self, xml, viewtag=None, viewid=None): def add_update(self, xml, section_name=None, section_id=None,
children=None):
""" """
Process single episode Process single episode
""" """
@ -501,18 +498,6 @@ class Episode(ItemBase, TvShowMixin):
airs_before_season, airs_before_episode, playurl, airs_before_season, airs_before_episode, playurl,
path_id, season_id, userdata['UserRating'])) path_id, season_id, userdata['UserRating']))
# Create or update the reference in plex table Add reference is
# idempotent; the call here updates also file_id and path_id when item
# is moved or renamed
self.plex_db.addReference(plex_id,
v.PLEX_TYPE_EPISODE,
kodi_id,
v.KODI_TYPE_EPISODE,
kodi_file_id=file_id,
kodi_pathid=path_id,
parent_id=season_id,
checksum=api.checksum(),
view_id=viewid)
self.kodi_db.modify_people(kodi_id, self.kodi_db.modify_people(kodi_id,
v.KODI_TYPE_EPISODE, v.KODI_TYPE_EPISODE,
api.people_list()) api.people_list())
@ -546,3 +531,13 @@ class Episode(ItemBase, TvShowMixin):
userdata['PlayCount'], userdata['PlayCount'],
userdata['LastPlayedDate'], userdata['LastPlayedDate'],
None) # Do send None - 2nd entry None) # Do send None - 2nd entry
self.plex_db.addReference(plex_id,
v.PLEX_TYPE_EPISODE,
kodi_id,
v.KODI_TYPE_EPISODE,
kodi_file_id=file_id,
kodi_pathid=path_id,
parent_id=season_id,
checksum=api.checksum(),
view_id=section_id,
last_sync=self.last_sync)

View file

@ -2,8 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from logging import getLogger from logging import getLogger
import threading import time
import Queue
from . import common, process_metadata, sections from . import common, process_metadata, sections
from .get_metadata import GetMetadataTask from .get_metadata import GetMetadataTask
@ -11,10 +10,6 @@ from .. import utils, backgroundthread, playlists, variables as v, state
from .. import plex_functions as PF, itemtypes from .. import plex_functions as PF, itemtypes
LOG = getLogger('PLEX.library_sync.full_sync') LOG = getLogger('PLEX.library_sync.full_sync')
DOWNLOAD_QUEUE = Queue.Queue(maxsize=500)
PROCESS_QUEUE = Queue.Queue(maxsize=100)
FANARTQUEUE = Queue.Queue()
THREADS = []
def start(repair, callback): def start(repair, callback):
@ -24,42 +19,51 @@ def start(repair, callback):
FullSync(repair, callback).start() FullSync(repair, callback).start()
class FullSync(threading.Thread, common.libsync_mixin): class FullSync(backgroundthread.KillableThread, common.libsync_mixin):
def __init__(self, repair, callback): def __init__(self, repair, callback):
""" """
repair=True: force sync EVERY item repair=True: force sync EVERY item
""" """
self.repair = repair self.repair = repair
self.callback = callback self.callback = callback
self.queue = None
self.process_thread = None
self.last_sync = None
self.plex_db = None
super(FullSync, self).__init__() super(FullSync, self).__init__()
def process_item(self, xml_item, section): def process_item(self, xml_item, get_children):
""" """
Processes a single library item Processes a single library item
""" """
plex_id = xml_item.get('ratingKey') plex_id = int(xml_item['ratingKey'])
if plex_id is None:
# Skipping items 'title=All episodes' without a 'ratingKey'
return
if self.new_items_only: if self.new_items_only:
if self.plex_db.check_plexid(plex_id) is None: if self.plex_db.check_plexid(plex_id) is None:
backgroundthread.BGThreader.addTask( backgroundthread.BGThreader.addTask(
GetMetadataTask().setup(PROCESS_QUEUE, GetMetadataTask().setup(self.queue,
plex_id, plex_id,
section)) get_children))
else: else:
if self.plex_db.check_checksum( if self.plex_db.check_checksum(
'K%s%s' % (plex_id, xml_item.get('updatedAt', ''))) is None: int('%s%s' % (xml_item['ratingKey'],
pass xml_item['updatedAt']))) is None:
backgroundthread.BGThreader.addTask(
GetMetadataTask().setup(self.queue,
plex_id,
get_children))
else:
self.plex_db.update_last_sync(plex_id, self.last_sync)
def plex_movies(self): @utils.log_time
def process_kind(self, kind):
""" """
Syncs movies kind is a tuple: (<name as unicode>,
kodi_type,
<itemtype class>,
get_children)
""" """
LOG.debug('Processing Plex movies') LOG.debug('Start processing %s', kind[0])
sections = (x for x in sections.SECTIONS sections = (x for x in sections.SECTIONS if x['kodi_type'] == kind[1])
if x['kodi_type'] == v.KODI_TYPE_MOVIE)
self.queue = Queue.Queue(maxsize=200)
for section in sections: for section in sections:
LOG.debug('Processing library section %s', section) LOG.debug('Processing library section %s', section)
if self.isCanceled(): if self.isCanceled():
@ -68,43 +72,21 @@ class FullSync(threading.Thread, common.libsync_mixin):
state.PATH_VERIFIED = False state.PATH_VERIFIED = False
try: try:
iterator = PF.PlexSectionItems(section['id']) iterator = PF.PlexSectionItems(section['id'])
t = process_metadata.ProcessMetadata( # Tell the processing thread about this new section
self.queue, queue_info = process_metadata.InitNewSection(
itemtypes.Movie, kind[2],
utils.cast(int, iterator.get('totalSize', 0))) utils.cast(int, iterator.get('totalSize', 0)),
for xml_item in PF.plex_section_items_generator(section['id']): utils.cast(unicode, iterator.get('librarySectionTitle')),
section['id'])
self.queue.put(queue_info)
for xml_item in iterator:
if self.isCanceled(): if self.isCanceled():
return False return False
self.process_item(xml_item, section) self.process_item(xml_item, kind[3])
except RuntimeError: except RuntimeError:
LOG.error('Could not entirely process section %s', section) LOG.error('Could not entirely process section %s', section)
return False continue
LOG.debug('Finished processing %s', kind[0])
# Populate self.updatelist and self.all_plex_ids
self.get_updatelist(all_plexmovies,
item_class,
'add_update',
view['name'],
view['id'])
self.process_updatelist(item_class)
# Update viewstate for EVERY item
sections = (x for x in sections.SECTIONS
if x['kodi_type'] == v.KODI_TYPE_MOVIE)
for view in sections:
if self.isCanceled():
return False
self.plex_update_watched(view['id'], item_class)
# PROCESS DELETES #####
if not self.repair:
# Manual sync, process deletes
with itemtypes.Movies() as movie_db:
for kodimovie in self.all_kodi_ids:
if kodimovie not in self.all_plex_ids:
movie_db.remove(kodimovie)
LOG.info("%s sync is finished.", item_class)
return True return True
def full_library_sync(self, new_items_only=False): def full_library_sync(self, new_items_only=False):
@ -113,29 +95,41 @@ class FullSync(threading.Thread, common.libsync_mixin):
process = [self.plex_movies, self.plex_tv_show] process = [self.plex_movies, self.plex_tv_show]
if state.ENABLE_MUSIC: if state.ENABLE_MUSIC:
process.append(self.plex_music) process.append(self.plex_music)
self.queue = backgroundthread.Queue.Queue(maxsize=200)
t = process_metadata.ProcessMetadata(self.queue, self.last_sync)
t.start()
kinds = [
('movies', v.KODI_TYPE_MOVIE, itemtypes.Movie, False),
('tv shows', v.KODI_TYPE_SHOW, itemtypes.Show, False),
('tv seasons', v.KODI_TYPE_SEASON, itemtypes.Season, False),
('tv shows', v.KODI_TYPE_SHOW, itemtypes.Show, False),
]
try:
for kind in kinds:
if self.isCanceled() or not self.process_kind(kind):
return False
# Do the processing # Let kodi update the views in any case, since we're doing a full sync
for kind in process: common.update_kodi_library(video=True, music=state.ENABLE_MUSIC)
if self.isCanceled() or not kind():
return False
# Let kodi update the views in any case, since we're doing a full sync if utils.window('plex_scancrashed') == 'true':
common.update_kodi_library(video=True, music=state.ENABLE_MUSIC) # Show warning if itemtypes.py crashed at some point
utils.messageDialog(utils.lang(29999), utils.lang(39408))
if utils.window('plex_scancrashed') == 'true': utils.window('plex_scancrashed', clear=True)
# Show warning if itemtypes.py crashed at some point elif utils.window('plex_scancrashed') == '401':
utils.messageDialog(utils.lang(29999), utils.lang(39408)) utils.window('plex_scancrashed', clear=True)
utils.window('plex_scancrashed', clear=True) if state.PMS_STATUS not in ('401', 'Auth'):
elif utils.window('plex_scancrashed') == '401': # Plex server had too much and returned ERROR
utils.window('plex_scancrashed', clear=True) utils.messageDialog(utils.lang(29999), utils.lang(39409))
if state.PMS_STATUS not in ('401', 'Auth'): finally:
# Plex server had too much and returned ERROR # Last element will kill the processing thread
utils.messageDialog(utils.lang(29999), utils.lang(39409)) self.queue.put(None)
return True return True
@utils.log_time @utils.log_time
def run(self): def run(self):
successful = False successful = False
self.last_sync = time.time()
try: try:
if self.isCanceled(): if self.isCanceled():
return return

View file

@ -10,6 +10,21 @@ from .. import utils, backgroundthread
LOG = getLogger('PLEX.library_sync.process_metadata') LOG = getLogger('PLEX.library_sync.process_metadata')
class InitNewSection(object):
"""
Throw this into the queue used for ProcessMetadata to tell it which
Plex library section we're looking at
context: itemtypes.Movie, itemtypes.Episode, etc.
"""
def __init__(self, context, total_number_of_items, section_name,
section_id):
self.context = context
self.total = total_number_of_items
self.name = section_name
self.id = section_id
class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin): class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin):
""" """
Not yet implemented for more than 1 thread - if ever. Only to be called by Not yet implemented for more than 1 thread - if ever. Only to be called by
@ -22,12 +37,13 @@ class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin):
item_class: as used to call functions in itemtypes.py e.g. 'Movies' => item_class: as used to call functions in itemtypes.py e.g. 'Movies' =>
itemtypes.Movies() itemtypes.Movies()
""" """
def __init__(self, queue, context, total_number_of_items): def __init__(self, queue, last_sync):
self.queue = queue self.queue = queue
self.context = context self.last_sync = last_sync
self.total = total_number_of_items self.total = 0
self.current = 0 self.current = 0
self.title = None self.title = None
self.section_name = None
super(ProcessMetadata, self).__init__() super(ProcessMetadata, self).__init__()
def update_dialog(self): def update_dialog(self):
@ -38,7 +54,7 @@ class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin):
except ZeroDivisionError: except ZeroDivisionError:
progress = 0 progress = 0
self.dialog.update(progress, self.dialog.update(progress,
utils.lang(29999), self.section_name,
'%s/%s: %s' '%s/%s: %s'
% (self.current, self.total, self.title)) % (self.current, self.total, self.title))
@ -49,32 +65,42 @@ class ProcessMetadata(backgroundthread.KillableThread, common.libsync_mixin):
LOG.debug('Processing thread started') LOG.debug('Processing thread started')
self.dialog = xbmcgui.DialogProgressBG() self.dialog = xbmcgui.DialogProgressBG()
self.dialog.create(utils.lang(39714)) self.dialog.create(utils.lang(39714))
with self.context() as context: try:
# Init with the very first library section. This will block!
section = self.queue.get()
self.queue.task_done()
if section is None:
return
while self.isCanceled() is False: while self.isCanceled() is False:
# grabs item from queue if section is None:
try:
xml = self.queue.get(block=False)
except backgroundthread.Queue.Empty:
xbmc.sleep(10)
continue
self.queue.task_done()
if xml is None:
break break
try: self.total = section.total
if xml.children is not None: self.section_name = section.name
context.add_update(xml[0], with section.context(self.last_sync) as context:
viewtag=xml['view_name'], while self.isCanceled() is False:
viewid=xml['view_id'], # grabs item from queue
children=xml['children']) try:
else: xml = self.queue.get(block=False)
context.add_update(xml[0], except backgroundthread.Queue.Empty:
viewtag=xml['view_name'], xbmc.sleep(20)
viewid=xml['view_id']) continue
except: self.queue.task_done()
utils.ERROR(txt='process_metadata crashed', notify=True) if xml is InitNewSection or xml is None:
self.current += 1 section = xml
if self.current % 20 == 0: break
self.title = utils.cast(unicode, xml[0].get('title')) try:
self.update_dialog() context.add_update(xml[0],
self.dialog.close() viewtag=section.name,
LOG.debug('Processing thread terminated') viewid=section.id,
children=xml.children)
except:
utils.ERROR(txt='process_metadata crashed',
notify=True)
self.current += 1
if self.current % 20 == 0:
self.title = utils.cast(unicode,
xml[0].get('title'))
self.update_dialog()
finally:
self.dialog.close()
LOG.debug('Processing thread terminated')

View file

@ -224,7 +224,7 @@ class LibrarySync(Thread):
kodi_fileid INTEGER, kodi_fileid INTEGER,
kodi_pathid INTEGER, kodi_pathid INTEGER,
parent_id INTEGER, parent_id INTEGER,
checksum INTEGER, checksum INTEGER UNIQUE,
fanart_synced INTEGER, fanart_synced INTEGER,
last_sync INTEGER) last_sync INTEGER)
''') ''')
@ -236,14 +236,11 @@ class LibrarySync(Thread):
kodi_tagid INTEGER, kodi_tagid INTEGER,
sync_to_kodi INTEGER) sync_to_kodi INTEGER)
''') ''')
plex_db.plexcursor.execute('''
CREATE TABLE IF NOT EXISTS version(idVersion TEXT)
''')
plex_db.plexcursor.execute(''' plex_db.plexcursor.execute('''
CREATE TABLE IF NOT EXISTS playlists( CREATE TABLE IF NOT EXISTS playlists(
plex_id PRIMARY KEY, plex_id INTEGER PRIMARY KEY ASC,
plex_name TEXT, plex_name TEXT,
plex_updatedat TEXT, plex_updatedat INTEGER,
kodi_path TEXT, kodi_path TEXT,
kodi_type TEXT, kodi_type TEXT,
kodi_hash TEXT) kodi_hash TEXT)

View file

@ -37,6 +37,7 @@ from urllib import urlencode, unquote, quote
from urlparse import parse_qsl from urlparse import parse_qsl
from xbmcgui import ListItem from xbmcgui import ListItem
from .utils import cast
from .downloadutils import DownloadUtils as DU from .downloadutils import DownloadUtils as DU
from . import clientinfo from . import clientinfo
from . import utils from . import utils
@ -103,27 +104,26 @@ class API(object):
def updated_at(self): def updated_at(self):
""" """
Returns the last time this item was updated as unicode, e.g. Returns the last time this item was updated as an int, e.g.
'1524739868', or None 1524739868 or None
""" """
return self.item.get('updatedAt') return cast(int, self.item.get('updatedAt'))
def checksum(self): def checksum(self):
""" """
Returns a string, not int. Returns the unique int <ratingKey><updatedAt> or None if this failes
WATCH OUT - time in Plex, not Kodi ;-)
""" """
# Include a letter to prohibit saving as an int! try:
return "K%s%s" % (self.plex_id(), self.item.get('updatedAt', '')) return cast(int, '%s%s' % (self.item.get('ratingKey'),
self.item.get('updatedAt')))
except ValueError:
pass
def plex_id(self): def plex_id(self):
""" """
Returns the Plex ratingKey such as 246922 as an integer or None Returns the Plex ratingKey such as 246922 as an integer or None
""" """
try: return cast(int, self.item.get('ratingKey'))
return int(self.item.get('ratingKey'))
except TypeError, ValueError:
pass
def path(self, force_first_media=True, force_addon=False, def path(self, force_first_media=True, force_addon=False,
direct_paths=None): direct_paths=None):

View file

@ -544,7 +544,8 @@ class DownloadGen(object):
def _download_chunk(self): def _download_chunk(self):
args = { args = {
'X-Plex-Container-Size': CONTAINERSIZE, 'X-Plex-Container-Size': CONTAINERSIZE,
'X-Plex-Container-Start': self._pos 'X-Plex-Container-Start': self._pos,
'sort': 'id'
} }
self.xml = DU().downloadUrl(self._url, parameters=args) self.xml = DU().downloadUrl(self._url, parameters=args)
try: try:
@ -604,7 +605,8 @@ def DownloadChunks(url):
while error_counter < 10: while error_counter < 10:
args = { args = {
'X-Plex-Container-Size': CONTAINERSIZE, 'X-Plex-Container-Size': CONTAINERSIZE,
'X-Plex-Container-Start': pos 'X-Plex-Container-Start': pos,
'sort': 'id'
} }
xmlpart = DU().downloadUrl(url + urlencode(args)) xmlpart = DU().downloadUrl(url + urlencode(args))
# If something went wrong - skip in the hope that it works next time # If something went wrong - skip in the hope that it works next time

View file

@ -214,6 +214,13 @@ class Plex_DB_Functions():
(checksum, )) (checksum, ))
return self.plexcursor.fetchone() return self.plexcursor.fetchone()
def update_last_sync(self, plex_id, last_sync):
"""
Fast method that updates Plex table with last_sync (an int) for plex_id
"""
self.plexcursor.execute('UPDATE plex SET last_sync = ? WHERE plex_id = ?',
(last_sync, plex_id, ))
def checksum(self, plex_type): def checksum(self, plex_type):
""" """
Returns a list of tuples (plex_id, checksum) for plex_type Returns a list of tuples (plex_id, checksum) for plex_type
@ -228,19 +235,21 @@ class Plex_DB_Functions():
def addReference(self, plex_id, plex_type, kodi_id, kodi_type, def addReference(self, plex_id, plex_type, kodi_id, kodi_type,
kodi_fileid=None, kodi_pathid=None, parent_id=None, kodi_fileid=None, kodi_pathid=None, parent_id=None,
checksum=None, view_id=None): checksum=None, section_id=None, last_sync=None):
""" """
Appends or replaces an entry into the plex table Appends or replaces an entry into the plex table
""" """
query = ''' query = '''
INSERT OR REPLACE INTO plex( INSERT OR REPLACE INTO plex(
plex_id, kodi_id, kodi_fileid, kodi_pathid, plex_type, plex_id, kodi_id, kodi_fileid, kodi_pathid, plex_type,
kodi_type, parent_id, checksum, view_id, fanart_synced) kodi_type, parent_id, checksum, section_id, fanart_synced,
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) last_sync)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''' '''
self.plexcursor.execute(query, (plex_id, kodi_id, kodi_fileid, self.plexcursor.execute(query, (plex_id, kodi_id, kodi_fileid,
kodi_pathid, plex_type, kodi_type, kodi_pathid, plex_type, kodi_type,
parent_id, checksum, view_id, 0)) parent_id, checksum, section_id, 0,
last_sync))
def updateReference(self, plex_id, checksum): def updateReference(self, plex_id, checksum):
""" """