PlexKodiConnect/resources/lib/library_sync/full_sync.py

472 lines
20 KiB
Python
Raw Normal View History

2018-10-20 23:49:04 +11:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
import Queue
import copy
2018-12-09 23:02:08 +11:00
import xbmcgui
from .get_metadata import GetMetadataTask, reset_collections
from . import common, sections
2018-11-19 00:59:17 +11:00
from .. import utils, timing, backgroundthread, variables as v, app
2018-10-20 23:49:04 +11:00
from .. import plex_functions as PF, itemtypes
2018-10-23 22:54:09 +11:00
from ..plex_db import PlexDB
2018-10-20 23:49:04 +11:00
2019-02-03 01:49:21 +11:00
if common.PLAYLIST_SYNC_ENABLED:
2018-10-23 22:54:09 +11:00
from .. import playlists
2019-02-03 01:49:21 +11:00
2018-10-20 23:49:04 +11:00
2018-11-02 01:43:43 +11:00
LOG = getLogger('PLEX.sync.full_sync')
2018-12-09 23:02:08 +11:00
# How many items will be put through the processing chain at once?
BATCH_SIZE = 2000
2018-12-09 23:02:08 +11:00
# Safety margin to filter PMS items - how many seconds to look into the past?
UPDATED_AT_SAFETY = 60 * 5
LAST_VIEWED_AT_SAFETY = 60 * 5
class InitNewSection(object):
2018-12-09 23:02:08 +11:00
"""
Throw this into the queue used for ProcessMetadata to tell it which
Plex library section we're looking at
"""
def __init__(self, context, total_number_of_items, section_name,
section_id, plex_type):
self.context = context
2018-12-09 23:02:08 +11:00
self.total = total_number_of_items
self.name = section_name
self.id = section_id
self.plex_type = plex_type
2018-10-20 23:49:04 +11:00
class FullSync(common.fullsync_mixin):
2018-10-24 16:08:32 +11:00
def __init__(self, repair, callback, show_dialog):
2018-10-20 23:49:04 +11:00
"""
repair=True: force sync EVERY item
"""
self.repair = repair
self.callback = callback
2018-10-21 21:03:21 +11:00
self.queue = None
self.process_thread = None
2018-11-05 02:53:42 +11:00
self.current_sync = None
2018-10-29 02:14:37 +11:00
self.plexdb = None
2018-10-22 01:56:13 +11:00
self.plex_type = None
2018-11-05 02:53:42 +11:00
self.section_type = None
2018-12-09 23:02:08 +11:00
self.worker_count = int(utils.settings('syncThreadNumber'))
self.item_count = 0
# For progress dialog
self.show_dialog = show_dialog
self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true'
self.dialog = None
self.total = 0
self.current = 0
2018-12-09 23:02:08 +11:00
self.processed = 0
self.title = ''
self.section = None
self.section_name = None
2019-02-03 06:22:06 +11:00
self.section_type_text = None
self.context = None
self.get_children = None
self.successful = None
self.section_success = None
2018-10-25 22:27:12 +11:00
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
self.threader = backgroundthread.ThreaderManager(
2018-12-09 23:02:08 +11:00
worker=backgroundthread.NonstoppingBackgroundWorker,
worker_count=self.worker_count)
2018-10-20 23:49:04 +11:00
super(FullSync, self).__init__()
2018-12-09 23:02:08 +11:00
def update_progressbar(self):
2018-12-10 03:23:43 +11:00
if self.dialog:
2018-12-09 23:02:08 +11:00
try:
progress = int(float(self.current) / float(self.total) * 100.0)
except ZeroDivisionError:
progress = 0
self.dialog.update(progress,
'%s (%s)' % (self.section_name, self.section_type_text),
'%s %s/%s'
% (self.title, self.current, self.total))
if app.APP.is_playing_video:
self.dialog.close()
self.dialog = None
2018-12-09 23:02:08 +11:00
2018-10-22 01:56:13 +11:00
def process_item(self, xml_item):
2018-10-20 23:49:04 +11:00
"""
Processes a single library item
"""
2018-10-26 00:57:12 +11:00
plex_id = int(xml_item.get('ratingKey'))
2018-12-09 23:02:08 +11:00
if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \
int('%s%s' % (plex_id,
xml_item.get('updatedAt',
xml_item.get('addedAt', 1541572987)))):
return
self.threader.addTask(GetMetadataTask(self.queue,
plex_id,
self.plex_type,
self.get_children,
self.item_count))
2018-12-09 23:02:08 +11:00
self.item_count += 1
def update_library(self):
LOG.debug('Writing changes to Kodi library now')
i = 0
if not self.section:
_, self.section = self.queue.get()
2018-12-09 23:02:08 +11:00
self.queue.task_done()
while not self.isCanceled() and self.item_count > 0:
section = self.section
if not section:
break
LOG.debug('Start or continue processing section %s (%ss)',
section.name, section.plex_type)
self.processed = 0
self.total = section.total
self.section_name = section.name
self.section_type_text = utils.lang(
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
with section.context(self.current_sync) as context:
while not self.isCanceled() and self.item_count > 0:
try:
_, item = self.queue.get(block=False)
2018-12-09 23:02:08 +11:00
except backgroundthread.Queue.Empty:
if self.threader.threader.working():
app.APP.monitor.waitForAbort(0.02)
continue
else:
# Try again, in case a thread just finished
i += 1
if i == 3:
break
continue
i = 0
self.queue.task_done()
if isinstance(item, dict):
context.add_update(item['xml'][0],
section_name=section.name,
section_id=section.id,
2018-12-09 23:02:08 +11:00
children=item['children'])
self.title = item['xml'][0].get('title')
self.processed += 1
elif isinstance(item, InitNewSection) or item is None:
self.section = item
break
else:
raise ValueError('Unknown type %s' % type(item))
self.item_count -= 1
self.current += 1
self.update_progressbar()
2018-12-09 23:02:08 +11:00
if self.processed == 500:
self.processed = 0
context.commit()
LOG.debug('Done writing changes to Kodi library')
2018-10-22 01:56:13 +11:00
2018-10-21 21:03:21 +11:00
@utils.log_time
2018-12-09 23:02:08 +11:00
def addupdate_section(self, section):
LOG.debug('Processing library section for new or changed items %s',
section)
if not self.install_sync_done:
app.SYNC.path_verified = False
try:
# Sync new, updated and deleted items
2019-01-09 04:00:54 +11:00
iterator = section.iterator
# Tell the processing thread about this new section
queue_info = InitNewSection(section.context,
iterator.total,
iterator.get('librarySectionTitle',
iterator.get('title1')),
section.section_id,
section.plex_type)
self.queue.put((-1, queue_info))
2018-12-09 23:02:08 +11:00
last = True
# To keep track of the item-number in order to kill while loops
self.item_count = 0
self.current = 0
# Initialize only once to avoid loosing the last value before
# we're breaking the for loop
2018-12-26 04:26:13 +11:00
loop = common.tag_last(iterator)
2018-12-09 23:02:08 +11:00
while True:
# Check Plex DB to see what we need to add/update
with PlexDB() as self.plexdb:
2019-02-03 06:22:06 +11:00
for last, xml_item in loop:
2018-12-09 23:02:08 +11:00
if self.isCanceled():
return False
self.process_item(xml_item)
if self.item_count == BATCH_SIZE:
break
2019-11-12 03:21:07 +11:00
# Make sure Plex DB above is closed before adding/updating!
self.update_library()
2018-12-09 23:02:08 +11:00
if last:
break
reset_collections()
return True
except RuntimeError:
LOG.error('Could not entirely process section %s', section)
return False
2018-12-09 23:02:08 +11:00
@utils.log_time
def playstate_per_section(self, section):
2019-02-05 02:15:58 +11:00
LOG.debug('Processing %s playstates for library section %s',
2019-01-09 04:00:54 +11:00
section.iterator.total, section)
try:
2018-12-09 23:02:08 +11:00
# Sync new, updated and deleted items
2019-01-09 04:00:54 +11:00
iterator = section.iterator
2018-12-09 23:02:08 +11:00
# Tell the processing thread about this new section
queue_info = InitNewSection(section.context,
iterator.total,
section.name,
section.section_id,
section.plex_type)
self.queue.put((-1, queue_info))
2018-12-09 23:02:08 +11:00
self.total = iterator.total
2019-01-09 04:00:54 +11:00
self.section_name = section.name
2018-12-09 23:02:08 +11:00
self.section_type_text = utils.lang(
2019-01-09 04:00:54 +11:00
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
2018-12-09 23:02:08 +11:00
self.current = 0
last = True
loop = common.tag_last(iterator)
while True:
2019-01-09 04:00:54 +11:00
with section.context(self.current_sync) as itemtype:
for i, (last, xml_item) in enumerate(loop):
if self.isCanceled():
return False
2019-01-09 04:00:54 +11:00
if not itemtype.update_userdata(xml_item, section.plex_type):
2019-01-18 04:05:02 +11:00
# Somehow did not sync this item yet
itemtype.add_update(xml_item,
section_name=section.name,
section_id=section.section_id)
itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']),
2019-01-09 04:00:54 +11:00
section.plex_type,
self.current_sync)
self.current += 1
self.update_progressbar()
if (i + 1) % (10 * BATCH_SIZE) == 0:
break
if last:
break
2018-12-09 23:02:08 +11:00
return True
except RuntimeError:
2018-12-09 23:02:08 +11:00
LOG.error('Could not entirely process section %s', section)
return False
def threaded_get_iterators(self, kinds, queue, all_items=False):
2018-10-20 23:49:04 +11:00
"""
2019-11-12 03:21:07 +11:00
Getting iterators is costly, so let's do it asynchronously
2018-10-20 23:49:04 +11:00
"""
try:
for kind in kinds:
for section in (x for x in app.SYNC.sections
2019-01-09 04:00:54 +11:00
if x.section_type == kind[1]):
if self.isCanceled():
2019-03-18 02:26:48 +11:00
LOG.debug('Need to exit now')
return
2019-01-09 04:00:54 +11:00
if not section.sync_to_kodi:
LOG.info('User chose to not sync section %s', section)
continue
element = copy.deepcopy(section)
2019-01-09 04:00:54 +11:00
element.plex_type = kind[0]
element.section_type = element.plex_type
element.context = kind[2]
element.get_children = kind[3]
element.Queue = kind[4]
if self.repair or all_items:
updated_at = None
else:
2019-01-09 04:00:54 +11:00
updated_at = section.last_sync - UPDATED_AT_SAFETY \
if section.last_sync else None
try:
2019-11-12 03:21:07 +11:00
element.iterator = PF.get_section_iterator(
section.section_id,
plex_type=element.plex_type,
updated_at=updated_at,
last_viewed_at=None)
except RuntimeError:
LOG.warn('Sync at least partially unsuccessful')
self.successful = False
self.section_success = False
else:
queue.put(element)
2019-03-18 01:42:29 +11:00
except Exception:
utils.ERROR(notify=True)
finally:
queue.put(None)
2018-10-20 23:49:04 +11:00
2018-10-22 01:56:13 +11:00
def full_library_sync(self):
2018-10-20 23:49:04 +11:00
"""
"""
# structure:
# (plex_type,
# section_type,
# context for itemtype,
# download children items, e.g. songs for a specific album?,
# Queue)
2018-10-21 21:03:21 +11:00
kinds = [
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE, itemtypes.Movie, False, Queue.Queue),
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW, itemtypes.Show, False, Queue.Queue),
(v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW, itemtypes.Season, False, Queue.Queue),
(v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW, itemtypes.Episode, False, Queue.Queue)
2018-10-21 21:03:21 +11:00
]
2018-11-19 00:59:17 +11:00
if app.SYNC.enable_music:
2018-10-25 22:22:34 +11:00
kinds.extend([
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False, Queue.Queue),
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, backgroundthread.OrderedQueue),
2018-11-04 04:47:51 +11:00
])
2018-12-09 23:02:08 +11:00
# ADD NEW ITEMS
# Already start setting up the iterators. We need to enforce
# syncing e.g. show before season before episode
iterator_queue = Queue.Queue()
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
None,
kinds,
iterator_queue)
backgroundthread.BGThreader.addTask(task)
while True:
self.section_success = True
section = iterator_queue.get()
2018-12-09 23:02:08 +11:00
iterator_queue.task_done()
if section is None:
break
# Setup our variables
2019-01-09 04:00:54 +11:00
self.plex_type = section.plex_type
self.section_type = section.section_type
self.context = section.context
self.get_children = section.get_children
self.queue = section.Queue()
# Now do the heavy lifting
2018-12-09 23:02:08 +11:00
if self.isCanceled() or not self.addupdate_section(section):
return False
if self.section_success:
# Need to check because a thread might have missed to get
# some items from the PMS
with PlexDB() as plexdb:
# Set the new time mark for the next delta sync
plexdb.update_section_last_sync(section.section_id,
self.current_sync)
common.update_kodi_library(video=True, music=True)
# Sync Plex playlists to Kodi and vice-versa
if common.PLAYLIST_SYNC_ENABLED:
if self.show_dialog:
if self.dialog:
self.dialog.close()
self.dialog = xbmcgui.DialogProgressBG()
# "Synching playlists"
self.dialog.create(utils.lang(39715))
if not playlists.full_sync():
return False
# SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that
# were set to unwatched). Also mark all items on the PMS to be able
# to delete the ones still in Kodi
LOG.info('Start synching playstate and userdata for every item')
# In order to not delete all your songs again
if app.SYNC.enable_music:
# We don't need to enforce the album order now
kinds.pop(5)
kinds.extend([
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, Queue.Queue),
(v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST, itemtypes.Song, True, Queue.Queue),
])
# Make sure we're not showing an item's title in the sync dialog
self.title = ''
self.threader.shutdown()
self.threader = None
if not self.show_dialog_userdata and self.dialog:
# Close the progress indicator dialog
self.dialog.close()
self.dialog = None
2018-12-09 23:02:08 +11:00
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
None,
kinds,
iterator_queue,
all_items=True)
2018-12-09 23:02:08 +11:00
backgroundthread.BGThreader.addTask(task)
while True:
section = iterator_queue.get()
iterator_queue.task_done()
2018-12-09 23:02:08 +11:00
if section is None:
break
# Setup our variables
2019-01-09 04:00:54 +11:00
self.plex_type = section.plex_type
self.section_type = section.section_type
self.context = section.context
self.get_children = section.get_children
2018-12-09 23:02:08 +11:00
# Now do the heavy lifting
if self.isCanceled() or not self.playstate_per_section(section):
return False
# Delete movies that are not on Plex anymore
LOG.debug('Looking for items to delete')
kinds = [
(v.PLEX_TYPE_MOVIE, itemtypes.Movie),
(v.PLEX_TYPE_SHOW, itemtypes.Show),
(v.PLEX_TYPE_SEASON, itemtypes.Season),
(v.PLEX_TYPE_EPISODE, itemtypes.Episode)
]
if app.SYNC.enable_music:
kinds.extend([
(v.PLEX_TYPE_ARTIST, itemtypes.Artist),
(v.PLEX_TYPE_ALBUM, itemtypes.Album),
(v.PLEX_TYPE_SONG, itemtypes.Song)
])
for plex_type, context in kinds:
2018-12-09 23:02:08 +11:00
# Delete movies that are not on Plex anymore
while True:
with context(self.current_sync) as ctx:
plex_ids = list(ctx.plexdb.plex_id_by_last_sync(plex_type,
self.current_sync,
BATCH_SIZE))
for plex_id in plex_ids:
if self.isCanceled():
return False
ctx.remove(plex_id, plex_type)
if len(plex_ids) < BATCH_SIZE:
break
LOG.debug('Done deleting')
2018-10-20 23:49:04 +11:00
return True
def run(self):
app.APP.register_thread(self)
try:
self._run()
finally:
app.APP.deregister_thread(self)
LOG.info('Done full_sync')
@utils.log_time
def _run(self):
2018-12-09 23:02:08 +11:00
self.current_sync = timing.plex_now()
2018-10-24 16:08:32 +11:00
# Get latest Plex libraries and build playlist and video node files
2019-03-18 01:31:02 +11:00
if self.isCanceled() or not sections.sync_from_pms(self):
2018-10-22 03:32:11 +11:00
return
self.successful = True
2018-10-20 23:49:04 +11:00
try:
2018-12-09 23:02:08 +11:00
if self.show_dialog:
self.dialog = xbmcgui.DialogProgressBG()
self.dialog.create(utils.lang(39714))
2018-10-23 22:54:09 +11:00
# Actual syncing - do only new items first
2018-11-05 02:53:42 +11:00
LOG.info('Running full_library_sync with repair=%s',
2018-10-23 22:54:09 +11:00
self.repair)
2019-03-18 01:31:02 +11:00
if self.isCanceled() or not self.full_library_sync():
self.successful = False
2018-10-20 23:49:04 +11:00
return
finally:
common.update_kodi_library(video=True, music=True)
2018-12-09 23:02:08 +11:00
if self.dialog:
self.dialog.close()
if self.threader:
self.threader.shutdown()
self.threader = None
if not self.successful and not self.isCanceled():
# "ERROR in library sync"
utils.dialog('notification',
heading='{plex}',
message=utils.lang(39410),
icon='{error}')
2018-10-24 16:08:32 +11:00
if self.callback:
self.callback(self.successful)
2018-10-20 23:49:04 +11:00
2018-10-24 16:08:32 +11:00
def start(show_dialog, repair=False, callback=None):
FullSync(repair, callback, show_dialog).run()