PlexKodiConnect/resources/lib/library_sync/full_sync.py

264 lines
10 KiB
Python
Raw Normal View History

2018-10-20 23:49:04 +11:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
import Queue
import copy
from cProfile import Profile
from pstats import Stats
from StringIO import StringIO
2018-10-20 23:49:04 +11:00
from .get_metadata import GetMetadataTask, reset_collections
from .process_metadata import InitNewSection, UpdateLastSync, ProcessMetadata, \
DeleteItem, UpdateUserdata
from . import common, sections
2018-11-19 00:59:17 +11:00
from .. import utils, timing, backgroundthread, variables as v, app
2018-10-20 23:49:04 +11:00
from .. import plex_functions as PF, itemtypes
2018-10-23 22:54:09 +11:00
from ..plex_db import PlexDB
2018-10-20 23:49:04 +11:00
2018-10-23 22:54:09 +11:00
if (v.PLATFORM != 'Microsoft UWP' and
utils.settings('enablePlaylistSync') == 'true'):
# Xbox cannot use watchdog, a dependency for PKC playlist features
from .. import playlists
PLAYLIST_SYNC_ENABLED = True
else:
PLAYLIST_SYNC_ENABLED = False
2018-10-20 23:49:04 +11:00
2018-11-02 01:43:43 +11:00
LOG = getLogger('PLEX.sync.full_sync')
2018-10-20 23:49:04 +11:00
class FullSync(common.libsync_mixin):
2018-10-24 16:08:32 +11:00
def __init__(self, repair, callback, show_dialog):
2018-10-20 23:49:04 +11:00
"""
repair=True: force sync EVERY item
"""
self._canceled = False
2018-10-20 23:49:04 +11:00
self.repair = repair
self.callback = callback
2018-10-24 16:08:32 +11:00
self.show_dialog = show_dialog
2018-10-21 21:03:21 +11:00
self.queue = None
self.process_thread = None
2018-11-05 02:53:42 +11:00
self.current_sync = None
2018-10-29 02:14:37 +11:00
self.plexdb = None
2018-10-22 01:56:13 +11:00
self.plex_type = None
2018-11-05 02:53:42 +11:00
self.section_type = None
2018-10-22 03:32:11 +11:00
self.processing_thread = None
2018-10-25 22:27:12 +11:00
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
self.threader = backgroundthread.ThreaderManager(
worker=backgroundthread.NonstoppingBackgroundWorker)
2018-10-20 23:49:04 +11:00
super(FullSync, self).__init__()
2018-10-22 01:56:13 +11:00
def process_item(self, xml_item):
2018-10-20 23:49:04 +11:00
"""
Processes a single library item
"""
2018-10-26 00:57:12 +11:00
plex_id = int(xml_item.get('ratingKey'))
2018-11-05 02:53:42 +11:00
if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \
int('%s%s' % (plex_id,
xml_item.get('updatedAt',
xml_item.get('addedAt', 1541572987)))):
# Already got EXACTLY this item in our DB. BUT need to collect all
# DB updates within the same thread
self.queue.put(UpdateLastSync(plex_id))
2018-11-05 02:53:42 +11:00
return
2018-10-26 02:50:59 +11:00
task = GetMetadataTask()
task.setup(self.queue, plex_id, self.plex_type, self.get_children)
self.threader.addTask(task)
2018-10-20 23:49:04 +11:00
2018-10-22 01:56:13 +11:00
def process_delete(self):
"""
2018-11-05 02:53:42 +11:00
Removes all the items that have NOT been updated (last_sync timestamp
is different)
2018-10-22 01:56:13 +11:00
"""
for plex_id in self.plexdb.plex_id_by_last_sync(self.plex_type,
self.current_sync):
if self.isCanceled():
return
self.queue.put(DeleteItem(plex_id))
2018-10-22 01:56:13 +11:00
2018-11-05 02:53:42 +11:00
@utils.log_time
def process_playstate(self, iterator):
"""
Updates the playstate (resume point, number of views, userrating, last
played date, etc.) for all elements in the (xml-)iterator
"""
for xml_item in iterator:
if self.isCanceled():
return False
self.queue.put(UpdateUserdata(xml_item))
2018-11-05 02:53:42 +11:00
2018-10-21 21:03:21 +11:00
@utils.log_time
def process_section(self, section):
LOG.debug('Processing library section %s', section)
if self.isCanceled():
return False
if not self.install_sync_done:
app.SYNC.path_verified = False
try:
# Sync new, updated and deleted items
iterator = section['iterator_1']
# Tell the processing thread about this new section
queue_info = InitNewSection(section['context'],
iterator.total,
iterator.get('librarySectionTitle'),
section['section_id'],
section['plex_type'])
self.queue.put(queue_info)
with PlexDB() as self.plexdb:
for xml_item in iterator:
if self.isCanceled():
return False
self.process_item(xml_item)
except RuntimeError:
LOG.error('Could not entirely process section %s', section)
return False
LOG.debug('Waiting for download threads to finish')
while self.threader.threader.working():
app.APP.monitor.waitForAbort(0.1)
LOG.debug('Waiting for processing thread to finish section')
self.queue.join()
reset_collections()
try:
# Sync playstate of every item
iterator = section['iterator_2']
# Tell the processing thread that we're syncing playstate
queue_info = InitNewSection(section['context'],
iterator.total,
iterator.get('librarySectionTitle'),
section['section_id'],
section['plex_type'])
self.queue.put(queue_info)
with PlexDB() as self.plexdb:
if section['plex_type'] != v.PLEX_TYPE_ARTIST:
self.process_playstate(iterator)
# Delete movies that are not on Plex anymore
self.process_delete()
# Wait again till the processing thread is done
self.queue.join()
except RuntimeError:
LOG.error('Could not process playstate for section %s', section)
return False
LOG.debug('Done processing playstate for section')
return True
def threaded_get_iterators(self, kinds, queue):
2018-10-20 23:49:04 +11:00
"""
PF.SectionItems is costly, so let's do it asynchronous
2018-10-20 23:49:04 +11:00
"""
try:
for kind in kinds:
for section in (x for x in sections.SECTIONS
if x['plex_type'] == kind[1]):
if self.isCanceled():
return
element = copy.deepcopy(section)
element['section_type'] = element['plex_type']
element['plex_type'] = kind[0]
element['element_type'] = kind[1]
element['context'] = kind[2]
element['get_children'] = kind[3]
element['iterator_1'] = PF.SectionItems(section['section_id'],
plex_type=kind[0])
element['iterator_2'] = PF.SectionItems(section['section_id'],
plex_type=kind[0])
queue.put(element)
finally:
queue.put(None)
2018-10-20 23:49:04 +11:00
2018-10-22 01:56:13 +11:00
def full_library_sync(self):
2018-10-20 23:49:04 +11:00
"""
"""
2018-10-21 21:03:21 +11:00
kinds = [
2018-11-05 02:53:42 +11:00
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE, itemtypes.Movie, False),
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW, itemtypes.Show, False),
(v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW, itemtypes.Season, False),
(v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW, itemtypes.Episode, False)
2018-10-21 21:03:21 +11:00
]
2018-11-19 00:59:17 +11:00
if app.SYNC.enable_music:
2018-10-25 22:22:34 +11:00
kinds.extend([
2018-11-05 02:53:42 +11:00
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False),
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True),
2018-11-04 04:47:51 +11:00
])
# Already start setting up the iterators. We need to enforce
# syncing e.g. show before season before episode
iterator_queue = Queue.Queue()
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
None,
kinds,
iterator_queue)
backgroundthread.BGThreader.addTask(task)
while True:
section = iterator_queue.get()
if section is None:
break
# Setup our variables
self.plex_type = section['plex_type']
self.section_type = section['section_type']
self.context = section['context']
self.get_children = section['get_children']
# Now do the heavy lifting
if self.isCanceled() or not self.process_section(section):
return False
iterator_queue.task_done()
2018-10-20 23:49:04 +11:00
return True
@utils.log_time
def run(self):
profile = Profile()
profile.enable()
2018-10-22 03:32:11 +11:00
if self.isCanceled():
return
2018-10-23 22:54:09 +11:00
successful = False
2018-11-19 00:59:17 +11:00
self.current_sync = timing.unix_timestamp()
2018-10-24 16:08:32 +11:00
# Delete playlist and video node files from Kodi
utils.delete_playlists()
utils.delete_nodes()
# Get latest Plex libraries and build playlist and video node files
2018-10-22 03:32:11 +11:00
if not sections.sync_from_pms():
return
2018-10-20 23:49:04 +11:00
try:
2018-10-22 03:32:11 +11:00
# Fire up our single processing thread
2018-11-27 05:40:56 +11:00
self.queue = backgroundthread.Queue.Queue(maxsize=1000)
self.processing_thread = ProcessMetadata(self.queue,
self.current_sync,
self.show_dialog)
2018-10-22 03:32:11 +11:00
self.processing_thread.start()
2018-10-23 22:54:09 +11:00
# Actual syncing - do only new items first
2018-11-05 02:53:42 +11:00
LOG.info('Running full_library_sync with repair=%s',
2018-10-23 22:54:09 +11:00
self.repair)
2018-10-20 23:49:04 +11:00
if not self.full_library_sync():
return
2018-11-05 02:53:42 +11:00
# Tell the processing thread to exit with one last element None
self.queue.put(None)
2018-10-20 23:49:04 +11:00
if self.isCanceled():
return
if PLAYLIST_SYNC_ENABLED and not playlists.full_sync():
return
successful = True
except:
utils.ERROR(txt='full_sync.py crashed', notify=True)
finally:
2018-11-05 02:53:42 +11:00
# This will block until the processing thread really exits
2018-10-22 03:32:11 +11:00
LOG.debug('Waiting for processing thread to exit')
self.processing_thread.join()
common.update_kodi_library(video=True, music=True)
self.threader.shutdown()
2018-10-24 16:08:32 +11:00
if self.callback:
self.callback(successful)
2018-10-22 03:32:11 +11:00
LOG.info('Done full_sync')
profile.disable()
string_io = StringIO()
stats = Stats(profile, stream=string_io).sort_stats('cumulative')
stats.print_stats()
LOG.info('cProfile result: ')
LOG.info(string_io.getvalue())
2018-10-20 23:49:04 +11:00
2018-10-24 16:08:32 +11:00
def start(show_dialog, repair=False, callback=None):
2018-10-20 23:49:04 +11:00
"""
"""
# FullSync(repair, callback, show_dialog).start()
FullSync(repair, callback, show_dialog).run()