Merge pull request #1075 from croneter/improve-sync

Rewire library sync to speed it up and fix sync getting stuck in rare cases
This commit is contained in:
croneter 2019-12-08 10:31:13 +01:00 committed by GitHub
commit c3bad7c954
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 659 additions and 464 deletions

View file

@ -2,12 +2,15 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from logging import getLogger from logging import getLogger
from time import time as _time
import threading import threading
import Queue import Queue
import heapq import heapq
from collections import deque
import xbmc import xbmc
from . import utils, app from . import utils, app, variables as v
LOG = getLogger('PLEX.threads') LOG = getLogger('PLEX.threads')
@ -36,8 +39,8 @@ class KillableThread(threading.Thread):
""" """
self._canceled = True self._canceled = True
# Make sure thread is running in order to exit quickly # Make sure thread is running in order to exit quickly
self._is_not_suspended.set()
self._is_not_asleep.set() self._is_not_asleep.set()
self._is_not_suspended.set()
def should_suspend(self): def should_suspend(self):
""" """
@ -66,8 +69,8 @@ class KillableThread(threading.Thread):
back to life back to life
""" """
self._suspended = False self._suspended = False
self._is_not_suspended.set()
self._is_not_asleep.set() self._is_not_asleep.set()
self._is_not_suspended.set()
def wait_while_suspended(self): def wait_while_suspended(self):
""" """
@ -104,6 +107,166 @@ class KillableThread(threading.Thread):
return not self._is_not_asleep.is_set() return not self._is_not_asleep.is_set()
class ProcessingQueue(Queue.Queue, object):
"""
Queue of queues that processes a queue completely before moving on to the
next queue. There's one queue per Section(). You need to initialize each
section with add_section(section) first.
Put tuples (count, item) into this queue, with count being the respective
position of the item in the queue, starting with 0 (zero).
(None, None) is the sentinel for a single queue being exhausted, added by
put_sentinel()
"""
def _init(self, maxsize):
self.queue = deque()
self._sections = deque()
self._queues = deque()
self._current_section = None
self._current_queue = None
self._counter = 0
def _qsize(self):
return self._current_queue._qsize() if self._current_queue else 0
def total_size(self):
"""
Return the approximate total size of all queues (not reliable!)
"""
self.mutex.acquire()
n = sum(q._qsize() for q in self._queues) if self._queues else 0
self.mutex.release()
return n
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
# Use >= instead of == due to OrderedQueue!
if self._qsize() >= self.maxsize:
raise Queue.Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = _time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Queue.Full
self.not_full.wait(remaining)
if self._put(item) == 0:
# Only notify one waiting thread if this item is put into the
# current queue
self.not_empty.notify()
else:
# Be sure to signal not_empty only once!
self._unlock_after_section_change()
self.unfinished_tasks += 1
finally:
self.not_full.release()
def _put(self, item):
"""
Returns the index of the section in whose subqueue we need to put the
item into
"""
for i, section in enumerate(self._sections):
if item[1]['section'] == section:
self._queues[i]._put(item)
break
else:
raise RuntimeError('Could not find section for item %s' % item[1])
return i
def _unlock_after_section_change(self):
"""
Ugly work-around if we expected more items to be synced, but we had
to lower our section.number_of_items because PKC decided that nothing
changed and we don't need to sync the respective item(s).
get() thus might block indefinitely
"""
while (self._current_section and
self._counter == self._current_section.number_of_items):
LOG.debug('Signaling completion of current section')
self._init_next_section()
if self._current_queue and self._current_queue._qsize():
LOG.debug('Signaling not_empty')
self.not_empty.notify()
def put_sentinel(self, section):
"""
Adds a new empty section as a sentinel. Call with an empty Section()
object.
Once the get()-method returns None, you've received the sentinel and
you've thus exhausted the queue
"""
self.not_empty.acquire()
try:
section.number_of_items = 1
self._add_section(section)
# Add the actual sentinel to the queue we just added
self._queues[-1]._put((None, None))
self.unfinished_tasks += 1
if len(self._queues) == 1:
# queue was already exhausted!
self._switch_queues()
self._counter = 0
self.not_empty.notify()
else:
self._unlock_after_section_change()
finally:
self.not_empty.release()
def add_section(self, section):
"""
Be sure to add all sections first before starting to pop items off this
queue or adding them to the queue
"""
self.mutex.acquire()
try:
self._add_section(section)
finally:
self.mutex.release()
def _add_section(self, section):
self._sections.append(section)
self._queues.append(
OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM
else Queue.Queue())
if self._current_section is None:
self._switch_queues()
def _init_next_section(self):
self._sections.popleft()
self._queues.popleft()
self._counter = 0
self._switch_queues()
def _switch_queues(self):
self._current_section = self._sections[0] if self._sections else None
self._current_queue = self._queues[0] if self._queues else None
def _get(self):
item = self._current_queue._get()
self._counter += 1
if self._counter == self._current_section.number_of_items:
self._init_next_section()
return item[1]
class OrderedQueue(Queue.PriorityQueue, object): class OrderedQueue(Queue.PriorityQueue, object):
""" """
Queue that enforces an order on the items it returns. An item you push Queue that enforces an order on the items it returns. An item you push
@ -111,58 +274,21 @@ class OrderedQueue(Queue.PriorityQueue, object):
(index, item) (index, item)
where index=-1 is the item that will be returned first. The Queue will block where index=-1 is the item that will be returned first. The Queue will block
until index=-1, 0, 1, 2, 3, ... is then made available until index=-1, 0, 1, 2, 3, ... is then made available
maxsize will be rather fuzzy, as _qsize returns 0 if we're still waiting
for the next smalles index. put() thus might not block always when it
should.
""" """
def __init__(self, maxsize=0): def __init__(self, maxsize=0):
self.next_index = 0
super(OrderedQueue, self).__init__(maxsize) super(OrderedQueue, self).__init__(maxsize)
self.smallest = -1
self.not_next_item = threading.Condition(self.mutex)
def _put(self, item, heappush=heapq.heappush): def _qsize(self, len=len):
heappush(self.queue, item) return len(self.queue) if self.queue[0][0] == self.next_index else 0
if item[0] == self.smallest:
self.not_next_item.notify()
def get(self, block=True, timeout=None): def _get(self, heappop=heapq.heappop):
"""Remove and return an item from the queue. self.next_index += 1
return heappop(self.queue)
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until an item is available. If 'timeout' is
a non-negative number, it blocks at most 'timeout' seconds and raises
the Empty exception if no item was available within that time.
Otherwise ('block' is false), return an item if one is immediately
available, else raise the Empty exception ('timeout' is ignored
in that case).
"""
self.not_empty.acquire()
try:
if not block:
if not self._qsize() or self.queue[0][0] != self.smallest:
raise Queue.Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
while self.queue[0][0] != self.smallest:
self.not_next_item.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = Queue._time() + timeout
while not self._qsize():
remaining = endtime - Queue._time()
if remaining <= 0.0:
raise Queue.Empty
self.not_empty.wait(remaining)
while self.queue[0][0] != self.smallest:
remaining = endtime - Queue._time()
if remaining <= 0.0:
raise Queue.Empty
self.not_next_item.wait(remaining)
item = self._get()
self.smallest += 1
self.not_full.notify()
return item
finally:
self.not_empty.release()
class Tasks(list): class Tasks(list):

View file

@ -74,6 +74,8 @@ def connect(media_type=None, wal_mode=True):
""" """
if media_type == "plex": if media_type == "plex":
db_path = v.DB_PLEX_PATH db_path = v.DB_PLEX_PATH
elif media_type == 'plex-copy':
db_path = v.DB_PLEX_COPY_PATH
elif media_type == "music": elif media_type == "music":
db_path = v.DB_MUSIC_PATH db_path = v.DB_MUSIC_PATH
elif media_type == "texture": elif media_type == "texture":

View file

@ -9,6 +9,20 @@ PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and
utils.settings('enablePlaylistSync') == 'true') utils.settings('enablePlaylistSync') == 'true')
class LibrarySyncMixin(object):
def suspend(self, block=False, timeout=None):
"""
Let's NOT suspend sync threads but immediately terminate them
"""
self.cancel()
def wait_while_suspended(self):
"""
Return immediately
"""
return self.should_cancel()
def update_kodi_library(video=True, music=True): def update_kodi_library(video=True, music=True):
""" """
Updates the Kodi library and thus refreshes the Kodi views and widgets Updates the Kodi library and thus refreshes the Kodi views and widgets

View file

@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
from . import common
from ..plex_db import PlexDB
from .. import backgroundthread, app
LOG = getLogger('PLEX.sync.fill_metadata_queue')
class FillMetadataQueue(common.LibrarySyncMixin,
backgroundthread.KillableThread, ):
"""
Threaded download of Plex XML metadata for a certain library item.
Fills the queue with the downloaded etree XML objects. Will use a COPIED
plex.db file (plex-copy.db) in order to read much faster without the
writing thread stalling
"""
def __init__(self, repair, section_queue, get_metadata_queue):
self.repair = repair
self.section_queue = section_queue
self.get_metadata_queue = get_metadata_queue
super(FillMetadataQueue, self).__init__()
def _process_section(self, section):
# Initialize only once to avoid loosing the last value before we're
# breaking the for loop
LOG.debug('Process section %s with %s items',
section, section.number_of_items)
count = 0
with PlexDB(lock=False, copy=True) as plexdb:
for xml in section.iterator:
if self.should_cancel():
break
plex_id = int(xml.get('ratingKey'))
checksum = int('{}{}'.format(
plex_id,
xml.get('updatedAt',
xml.get('addedAt', '1541572987'))))
if (not self.repair and
plexdb.checksum(plex_id, section.plex_type) == checksum):
continue
self.get_metadata_queue.put((count, plex_id, section))
count += 1
# We might have received LESS items from the PMS than anticipated.
# Ensures that our queues finish
section.number_of_items = count
def run(self):
LOG.debug('Starting %s thread', self.__class__.__name__)
app.APP.register_thread(self)
try:
while not self.should_cancel():
section = self.section_queue.get()
self.section_queue.task_done()
if section is None:
break
self._process_section(section)
except Exception:
from .. import utils
utils.ERROR(notify=True)
finally:
# Signal the download metadata threads to stop with a sentinel
self.get_metadata_queue.put(None)
app.APP.deregister_thread(self)
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)

View file

@ -3,15 +3,15 @@
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from logging import getLogger from logging import getLogger
import Queue import Queue
import copy
import xbmcgui import xbmcgui
from .get_metadata import GetMetadataTask, reset_collections from .get_metadata import GetMetadataThread
from .fill_metadata_queue import FillMetadataQueue
from .process_metadata import ProcessMetadataThread
from . import common, sections from . import common, sections
from .. import utils, timing, backgroundthread, variables as v, app from .. import utils, timing, backgroundthread, variables as v, app
from .. import plex_functions as PF, itemtypes from .. import plex_functions as PF, itemtypes, path_ops
from ..plex_db import PlexDB
if common.PLAYLIST_SYNC_ENABLED: if common.PLAYLIST_SYNC_ENABLED:
from .. import playlists from .. import playlists
@ -19,222 +19,117 @@ if common.PLAYLIST_SYNC_ENABLED:
LOG = getLogger('PLEX.sync.full_sync') LOG = getLogger('PLEX.sync.full_sync')
# How many items will be put through the processing chain at once? # How many items will be put through the processing chain at once?
BATCH_SIZE = 2000 BATCH_SIZE = 250
# Size of queue for xmls to be downloaded from PMS for/and before processing
QUEUE_BUFFER = 50
# Max number of xmls held in memory
MAX_QUEUE_SIZE = 500
# Safety margin to filter PMS items - how many seconds to look into the past? # Safety margin to filter PMS items - how many seconds to look into the past?
UPDATED_AT_SAFETY = 60 * 5 UPDATED_AT_SAFETY = 60 * 5
LAST_VIEWED_AT_SAFETY = 60 * 5 LAST_VIEWED_AT_SAFETY = 60 * 5
class InitNewSection(object): class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
"""
Throw this into the queue used for ProcessMetadata to tell it which
Plex library section we're looking at
"""
def __init__(self, context, total_number_of_items, section_name,
section_id, plex_type):
self.context = context
self.total = total_number_of_items
self.name = section_name
self.id = section_id
self.plex_type = plex_type
class FullSync(backgroundthread.KillableThread):
def __init__(self, repair, callback, show_dialog): def __init__(self, repair, callback, show_dialog):
""" """
repair=True: force sync EVERY item repair=True: force sync EVERY item
""" """
self.repair = repair self.repair = repair
self.callback = callback self.callback = callback
self.queue = None
self.process_thread = None
self.current_sync = None
self.plexdb = None
self.plex_type = None
self.section_type = None
self.worker_count = int(utils.settings('syncThreadNumber'))
self.item_count = 0
# For progress dialog # For progress dialog
self.show_dialog = show_dialog self.show_dialog = show_dialog
self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true' self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true'
self.dialog = None if self.show_dialog:
self.total = 0 self.dialog = xbmcgui.DialogProgressBG()
self.current = 0 self.dialog.create(utils.lang(39714))
self.processed = 0 else:
self.title = '' self.dialog = None
self.section = None
self.section_name = None self.section_queue = Queue.Queue()
self.section_type_text = None self.get_metadata_queue = Queue.Queue(maxsize=5000)
self.context = None self.processing_queue = backgroundthread.ProcessingQueue(maxsize=500)
self.get_children = None self.current_time = timing.plex_now()
self.successful = None self.last_section = sections.Section()
self.section_success = None
self.successful = True
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true' self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
self.threader = backgroundthread.ThreaderManager( self.threads = [
worker=backgroundthread.NonstoppingBackgroundWorker, GetMetadataThread(self.get_metadata_queue, self.processing_queue)
worker_count=self.worker_count) for _ in range(int(utils.settings('syncThreadNumber')))
]
for t in self.threads:
t.start()
super(FullSync, self).__init__() super(FullSync, self).__init__()
def suspend(self, block=False, timeout=None): def update_progressbar(self, section, title, current):
""" if not self.dialog:
Let's NOT suspend sync threads but immediately terminate them
"""
self.cancel()
def update_progressbar(self):
if self.dialog:
try:
progress = int(float(self.current) / float(self.total) * 100.0)
except ZeroDivisionError:
progress = 0
self.dialog.update(progress,
'%s (%s)' % (self.section_name, self.section_type_text),
'%s %s/%s'
% (self.title, self.current, self.total))
if app.APP.is_playing_video:
self.dialog.close()
self.dialog = None
def process_item(self, xml_item):
"""
Processes a single library item
"""
plex_id = int(xml_item.get('ratingKey'))
if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \
int('%s%s' % (plex_id,
xml_item.get('updatedAt',
xml_item.get('addedAt', 1541572987)))):
return return
self.threader.addTask(GetMetadataTask(self.queue, current += 1
plex_id,
self.plex_type,
self.get_children,
self.item_count))
self.item_count += 1
def update_library(self):
LOG.debug('Writing changes to Kodi library now')
i = 0
if not self.section:
_, self.section = self.queue.get()
self.queue.task_done()
while not self.should_cancel() and self.item_count > 0:
section = self.section
if not section:
break
LOG.debug('Start or continue processing section %s (%ss)',
section.name, section.plex_type)
self.processed = 0
self.total = section.total
self.section_name = section.name
self.section_type_text = utils.lang(
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
with section.context(self.current_sync) as context:
while not self.should_cancel() and self.item_count > 0:
try:
_, item = self.queue.get(block=False)
except backgroundthread.Queue.Empty:
if self.threader.threader.working():
self.sleep(0.02)
continue
else:
# Try again, in case a thread just finished
i += 1
if i == 3:
break
continue
i = 0
self.queue.task_done()
if isinstance(item, dict):
context.add_update(item['xml'][0],
section_name=section.name,
section_id=section.id,
children=item['children'])
self.title = item['xml'][0].get('title')
self.processed += 1
elif isinstance(item, InitNewSection) or item is None:
self.section = item
break
else:
raise ValueError('Unknown type %s' % type(item))
self.item_count -= 1
self.current += 1
self.update_progressbar()
if self.processed == 500:
self.processed = 0
context.commit()
LOG.debug('Done writing changes to Kodi library')
@utils.log_time
def addupdate_section(self, section):
LOG.debug('Processing library section for new or changed items %s',
section)
if not self.install_sync_done:
app.SYNC.path_verified = False
try: try:
# Sync new, updated and deleted items progress = int(float(current) / float(section.number_of_items) * 100.0)
iterator = section.iterator except ZeroDivisionError:
# Tell the processing thread about this new section progress = 0
queue_info = InitNewSection(section.context, self.dialog.update(progress,
iterator.total, '%s (%s)' % (section.name, section.section_type_text),
iterator.get('librarySectionTitle', '%s %s/%s'
iterator.get('title1')), % (title, current, section.number_of_items))
section.section_id, if app.APP.is_playing_video:
section.plex_type) self.dialog.close()
self.queue.put((-1, queue_info)) self.dialog = None
last = True
# To keep track of the item-number in order to kill while loops @staticmethod
self.item_count = 0 def copy_plex_db():
self.current = 0 """
# Initialize only once to avoid loosing the last value before Takes the current plex.db file and copies it to plex-copy.db
# we're breaking the for loop This will allow us to have "concurrent" connections during adding/
loop = common.tag_last(iterator) updating items, increasing sync speed tremendously.
while True: Using the same DB with e.g. WAL mode did not really work out...
# Check Plex DB to see what we need to add/update """
with PlexDB() as self.plexdb: path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH)
for last, xml_item in loop:
if self.should_cancel():
return False
self.process_item(xml_item)
if self.item_count == BATCH_SIZE:
break
# Make sure Plex DB above is closed before adding/updating!
self.update_library()
if last:
break
reset_collections()
return True
except RuntimeError:
LOG.error('Could not entirely process section %s', section)
return False
@utils.log_time @utils.log_time
def processing_loop_new_and_changed_items(self):
LOG.debug('Start working')
scanner_thread = FillMetadataQueue(self.repair,
self.section_queue,
self.get_metadata_queue)
scanner_thread.start()
process_thread = ProcessMetadataThread(self.current_time,
self.processing_queue,
self.update_progressbar)
process_thread.start()
LOG.debug('Waiting for scanner thread to finish up')
scanner_thread.join()
LOG.debug('Waiting for metadata download threads to finish up')
for t in self.threads:
t.join()
LOG.debug('Download metadata threads finished')
# Sentinel for the process_thread once we added everything else
self.processing_queue.put_sentinel(sections.Section())
process_thread.join()
self.successful = process_thread.successful
LOG.debug('threads finished work. successful: %s', self.successful)
@utils.log_time
def processing_loop_playstates(self):
while not self.should_cancel():
section = self.section_queue.get()
self.section_queue.task_done()
if section is None:
break
self.playstate_per_section(section)
def playstate_per_section(self, section): def playstate_per_section(self, section):
LOG.debug('Processing %s playstates for library section %s', LOG.debug('Processing %s playstates for library section %s',
section.iterator.total, section) section.number_of_items, section)
try: try:
# Sync new, updated and deleted items
iterator = section.iterator iterator = section.iterator
# Tell the processing thread about this new section iterator = common.tag_last(iterator)
queue_info = InitNewSection(section.context,
iterator.total,
section.name,
section.section_id,
section.plex_type)
self.queue.put((-1, queue_info))
self.total = iterator.total
self.section_name = section.name
self.section_type_text = utils.lang(
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
self.current = 0
last = True last = True
loop = common.tag_last(iterator) while not self.should_cancel():
while True: with section.context(self.current_time) as itemtype:
with section.context(self.current_sync) as itemtype: for last, xml_item in iterator:
for i, (last, xml_item) in enumerate(loop): section.count += 1
if self.should_cancel():
return False
if not itemtype.update_userdata(xml_item, section.plex_type): if not itemtype.update_userdata(xml_item, section.plex_type):
# Somehow did not sync this item yet # Somehow did not sync this item yet
itemtype.add_update(xml_item, itemtype.add_update(xml_item,
@ -242,22 +137,21 @@ class FullSync(backgroundthread.KillableThread):
section_id=section.section_id) section_id=section.section_id)
itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']), itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']),
section.plex_type, section.plex_type,
self.current_sync) self.current_time)
self.current += 1 self.update_progressbar(section, '', section.count)
self.update_progressbar() if section.count % (10 * BATCH_SIZE) == 0:
if (i + 1) % (10 * BATCH_SIZE) == 0:
break break
if last: if last:
break break
return True
except RuntimeError: except RuntimeError:
LOG.error('Could not entirely process section %s', section) LOG.error('Could not entirely process section %s', section)
return False self.successful = False
def threaded_get_iterators(self, kinds, queue, all_items=False): def get_generators(self, kinds, queue, all_items):
""" """
Getting iterators is costly, so let's do it asynchronously Getting iterators is costly, so let's do it asynchronously
""" """
LOG.debug('Start get_generators')
try: try:
for kind in kinds: for kind in kinds:
for section in (x for x in app.SYNC.sections for section in (x for x in app.SYNC.sections
@ -268,86 +162,55 @@ class FullSync(backgroundthread.KillableThread):
if not section.sync_to_kodi: if not section.sync_to_kodi:
LOG.info('User chose to not sync section %s', section) LOG.info('User chose to not sync section %s', section)
continue continue
element = copy.deepcopy(section) section = sections.get_sync_section(section,
element.plex_type = kind[0] plex_type=kind[0])
element.section_type = element.plex_type
element.context = kind[2]
element.get_children = kind[3]
element.Queue = kind[4]
if self.repair or all_items: if self.repair or all_items:
updated_at = None updated_at = None
else: else:
updated_at = section.last_sync - UPDATED_AT_SAFETY \ updated_at = section.last_sync - UPDATED_AT_SAFETY \
if section.last_sync else None if section.last_sync else None
try: try:
element.iterator = PF.get_section_iterator( section.iterator = PF.get_section_iterator(
section.section_id, section.section_id,
plex_type=element.plex_type, plex_type=section.plex_type,
updated_at=updated_at, updated_at=updated_at,
last_viewed_at=None) last_viewed_at=None)
except RuntimeError: except RuntimeError:
LOG.warn('Sync at least partially unsuccessful') LOG.error('Sync at least partially unsuccessful!')
self.successful = False LOG.error('Error getting section iterator %s', section)
self.section_success = False
else: else:
queue.put(element) section.number_of_items = section.iterator.total
if section.number_of_items > 0:
self.processing_queue.add_section(section)
queue.put(section)
LOG.debug('Put section in queue with %s items: %s',
section.number_of_items, section)
except Exception: except Exception:
utils.ERROR(notify=True) utils.ERROR(notify=True)
finally: finally:
queue.put(None) queue.put(None)
LOG.debug('Exiting get_generators')
def full_library_sync(self): def full_library_sync(self):
"""
"""
# structure:
# (plex_type,
# section_type,
# context for itemtype,
# download children items, e.g. songs for a specific album?,
# Queue)
kinds = [ kinds = [
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE, itemtypes.Movie, False, Queue.Queue), (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE),
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW, itemtypes.Show, False, Queue.Queue), (v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW),
(v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW, itemtypes.Season, False, Queue.Queue), (v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW),
(v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW, itemtypes.Episode, False, Queue.Queue) (v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW)
] ]
if app.SYNC.enable_music: if app.SYNC.enable_music:
kinds.extend([ kinds.extend([
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False, Queue.Queue), (v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST),
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, backgroundthread.OrderedQueue), (v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST),
]) ])
# ADD NEW ITEMS # ADD NEW ITEMS
# Already start setting up the iterators. We need to enforce # We need to enforce syncing e.g. show before season before episode
# syncing e.g. show before season before episode self.get_generators(kinds, self.section_queue, False)
iterator_queue = Queue.Queue() # Do the heavy lifting
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators, self.processing_loop_new_and_changed_items()
None,
kinds,
iterator_queue)
backgroundthread.BGThreader.addTask(task)
while True:
self.section_success = True
section = iterator_queue.get()
iterator_queue.task_done()
if section is None:
break
# Setup our variables
self.plex_type = section.plex_type
self.section_type = section.section_type
self.context = section.context
self.get_children = section.get_children
self.queue = section.Queue()
# Now do the heavy lifting
if self.should_cancel() or not self.addupdate_section(section):
return False
if self.section_success:
# Need to check because a thread might have missed to get
# some items from the PMS
with PlexDB() as plexdb:
# Set the new time mark for the next delta sync
plexdb.update_section_last_sync(section.section_id,
self.current_sync)
common.update_kodi_library(video=True, music=True) common.update_kodi_library(video=True, music=True)
if self.should_cancel() or not self.successful:
return
# Sync Plex playlists to Kodi and vice-versa # Sync Plex playlists to Kodi and vice-versa
if common.PLAYLIST_SYNC_ENABLED: if common.PLAYLIST_SYNC_ENABLED:
@ -357,48 +220,27 @@ class FullSync(backgroundthread.KillableThread):
self.dialog = xbmcgui.DialogProgressBG() self.dialog = xbmcgui.DialogProgressBG()
# "Synching playlists" # "Synching playlists"
self.dialog.create(utils.lang(39715)) self.dialog.create(utils.lang(39715))
if not playlists.full_sync(): if not playlists.full_sync() or self.should_cancel():
return False return
# SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that # SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that
# were set to unwatched). Also mark all items on the PMS to be able # were set to unwatched). Also mark all items on the PMS to be able
# to delete the ones still in Kodi # to delete the ones still in Kodi
LOG.info('Start synching playstate and userdata for every item') LOG.debug('Start synching playstate and userdata for every item')
# In order to not delete all your songs again
if app.SYNC.enable_music: if app.SYNC.enable_music:
# We don't need to enforce the album order now # In order to not delete all your songs again
kinds.pop(5)
kinds.extend([ kinds.extend([
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, Queue.Queue), (v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST),
(v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST, itemtypes.Song, True, Queue.Queue),
]) ])
# Make sure we're not showing an item's title in the sync dialog # Make sure we're not showing an item's title in the sync dialog
self.title = ''
self.threader.shutdown()
self.threader = None
if not self.show_dialog_userdata and self.dialog: if not self.show_dialog_userdata and self.dialog:
# Close the progress indicator dialog # Close the progress indicator dialog
self.dialog.close() self.dialog.close()
self.dialog = None self.dialog = None
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators, self.get_generators(kinds, self.section_queue, True)
None, self.processing_loop_playstates()
kinds, if self.should_cancel() or not self.successful:
iterator_queue, return
all_items=True)
backgroundthread.BGThreader.addTask(task)
while True:
section = iterator_queue.get()
iterator_queue.task_done()
if section is None:
break
# Setup our variables
self.plex_type = section.plex_type
self.section_type = section.section_type
self.context = section.context
self.get_children = section.get_children
# Now do the heavy lifting
if self.should_cancel() or not self.playstate_per_section(section):
return False
# Delete movies that are not on Plex anymore # Delete movies that are not on Plex anymore
LOG.debug('Looking for items to delete') LOG.debug('Looking for items to delete')
@ -417,60 +259,50 @@ class FullSync(backgroundthread.KillableThread):
for plex_type, context in kinds: for plex_type, context in kinds:
# Delete movies that are not on Plex anymore # Delete movies that are not on Plex anymore
while True: while True:
with context(self.current_sync) as ctx: with context(self.current_time) as ctx:
plex_ids = list(ctx.plexdb.plex_id_by_last_sync(plex_type, plex_ids = list(
self.current_sync, ctx.plexdb.plex_id_by_last_sync(plex_type,
BATCH_SIZE)) self.current_time,
BATCH_SIZE))
for plex_id in plex_ids: for plex_id in plex_ids:
if self.should_cancel(): if self.should_cancel():
return False return
ctx.remove(plex_id, plex_type) ctx.remove(plex_id, plex_type)
if len(plex_ids) < BATCH_SIZE: if len(plex_ids) < BATCH_SIZE:
break break
LOG.debug('Done deleting') LOG.debug('Done looking for items to delete')
return True
def run(self): def run(self):
app.APP.register_thread(self) app.APP.register_thread(self)
LOG.info('Running library sync with repair=%s', self.repair)
try: try:
self._run() self.run_full_library_sync()
except Exception:
utils.ERROR(notify=True)
self.successful = False
finally: finally:
app.APP.deregister_thread(self) app.APP.deregister_thread(self)
LOG.info('Done full_sync') LOG.info('Library sync done. successful: %s', self.successful)
@utils.log_time @utils.log_time
def _run(self): def run_full_library_sync(self):
self.current_sync = timing.plex_now()
# Get latest Plex libraries and build playlist and video node files
if self.should_cancel() or not sections.sync_from_pms(self):
return
self.successful = True
try: try:
if self.show_dialog: # Get latest Plex libraries and build playlist and video node files
self.dialog = xbmcgui.DialogProgressBG() if self.should_cancel() or not sections.sync_from_pms(self):
self.dialog.create(utils.lang(39714))
# Actual syncing - do only new items first
LOG.info('Running full_library_sync with repair=%s',
self.repair)
if self.should_cancel() or not self.full_library_sync():
self.successful = False
return return
self.copy_plex_db()
self.full_library_sync()
finally: finally:
common.update_kodi_library(video=True, music=True) common.update_kodi_library(video=True, music=True)
if self.dialog: if self.dialog:
self.dialog.close() self.dialog.close()
if self.threader:
self.threader.shutdown()
self.threader = None
if not self.successful and not self.should_cancel(): if not self.successful and not self.should_cancel():
# "ERROR in library sync" # "ERROR in library sync"
utils.dialog('notification', utils.dialog('notification',
heading='{plex}', heading='{plex}',
message=utils.lang(39410), message=utils.lang(39410),
icon='{error}') icon='{error}')
if self.callback: self.callback(self.successful)
self.callback(self.successful)
def start(show_dialog, repair=False, callback=None): def start(show_dialog, repair=False, callback=None):

View file

@ -2,73 +2,46 @@
from __future__ import absolute_import, division, unicode_literals from __future__ import absolute_import, division, unicode_literals
from logging import getLogger from logging import getLogger
from . import common
from ..plex_api import API from ..plex_api import API
from .. import plex_functions as PF, backgroundthread, utils, variables as v from .. import backgroundthread, plex_functions as PF, utils, variables as v
from .. import app
LOG = getLogger("PLEX." + __name__)
LOG = getLogger('PLEX.sync.get_metadata')
LOCK = backgroundthread.threading.Lock() LOCK = backgroundthread.threading.Lock()
# List of tuples: (collection index [as in an item's metadata with "Collection
# id"], collection plex id)
COLLECTION_MATCH = None
# Dict with entries of the form <collection index>: <collection xml>
COLLECTION_XMLS = {}
def reset_collections(): class GetMetadataThread(common.LibrarySyncMixin,
""" backgroundthread.KillableThread):
Collections seem unique to Plex sections
"""
global LOCK, COLLECTION_MATCH, COLLECTION_XMLS
with LOCK:
COLLECTION_MATCH = None
COLLECTION_XMLS = {}
class GetMetadataTask(backgroundthread.Task):
""" """
Threaded download of Plex XML metadata for a certain library item. Threaded download of Plex XML metadata for a certain library item.
Fills the queue with the downloaded etree XML objects Fills the queue with the downloaded etree XML objects
Input:
queue Queue.Queue() object where this thread will store
the downloaded metadata XMLs as etree objects
""" """
def __init__(self, queue, plex_id, plex_type, get_children=False, def __init__(self, get_metadata_queue, processing_queue):
count=None): self.get_metadata_queue = get_metadata_queue
self.queue = queue self.processing_queue = processing_queue
self.plex_id = plex_id super(GetMetadataThread, self).__init__()
self.plex_type = plex_type
self.get_children = get_children
self.count = count
super(GetMetadataTask, self).__init__()
def suspend(self, block=False, timeout=None):
"""
Let's NOT suspend sync threads but immediately terminate them
"""
self.cancel()
def _collections(self, item): def _collections(self, item):
global COLLECTION_MATCH, COLLECTION_XMLS
api = API(item['xml'][0]) api = API(item['xml'][0])
if COLLECTION_MATCH is None: collection_match = item['section'].collection_match
COLLECTION_MATCH = PF.collections(api.library_section_id()) collection_xmls = item['section'].collection_xmls
if COLLECTION_MATCH is None: if collection_match is None:
collection_match = PF.collections(api.library_section_id())
if collection_match is None:
LOG.error('Could not download collections') LOG.error('Could not download collections')
return return
# Extract what we need to know # Extract what we need to know
COLLECTION_MATCH = \ collection_match = \
[(utils.cast(int, x.get('index')), [(utils.cast(int, x.get('index')),
utils.cast(int, x.get('ratingKey'))) for x in COLLECTION_MATCH] utils.cast(int, x.get('ratingKey'))) for x in collection_match]
item['children'] = {} item['children'] = {}
for plex_set_id, set_name in api.collections(): for plex_set_id, set_name in api.collections():
if self.should_cancel(): if self.should_cancel():
return return
if plex_set_id not in COLLECTION_XMLS: if plex_set_id not in collection_xmls:
# Get Plex metadata for collections - a pain # Get Plex metadata for collections - a pain
for index, collection_plex_id in COLLECTION_MATCH: for index, collection_plex_id in collection_match:
if index == plex_set_id: if index == plex_set_id:
collection_xml = PF.GetPlexMetadata(collection_plex_id) collection_xml = PF.GetPlexMetadata(collection_plex_id)
try: try:
@ -77,54 +50,84 @@ class GetMetadataTask(backgroundthread.Task):
LOG.error('Could not get collection %s %s', LOG.error('Could not get collection %s %s',
collection_plex_id, set_name) collection_plex_id, set_name)
continue continue
COLLECTION_XMLS[plex_set_id] = collection_xml collection_xmls[plex_set_id] = collection_xml
break break
else: else:
LOG.error('Did not find Plex collection %s %s', LOG.error('Did not find Plex collection %s %s',
plex_set_id, set_name) plex_set_id, set_name)
continue continue
item['children'][plex_set_id] = COLLECTION_XMLS[plex_set_id] item['children'][plex_set_id] = collection_xmls[plex_set_id]
def _process_abort(self, count, section):
# Make sure other threads will also receive sentinel
self.get_metadata_queue.put(None)
if count is not None:
self._process_skipped_item(count, section)
def _process_skipped_item(self, count, section):
section.sync_successful = False
# Add a "dummy" item so we're not skipping a beat
self.processing_queue.put((count, {'section': section, 'xml': None}))
def run(self): def run(self):
""" LOG.debug('Starting %s thread', self.__class__.__name__)
Do the work app.APP.register_thread(self)
""" try:
if self.should_cancel(): self._run()
return finally:
# Download Metadata app.APP.deregister_thread(self)
item = { LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
'xml': PF.GetPlexMetadata(self.plex_id),
'children': None def _run(self):
} while True:
if item['xml'] is None: item = self.get_metadata_queue.get()
# Did not receive a valid XML - skip that item for now
LOG.error("Could not get metadata for %s. Skipping that item "
"for now", self.plex_id)
return
elif item['xml'] == 401:
LOG.error('HTTP 401 returned by PMS. Too much strain? '
'Cancelling sync for now')
utils.window('plex_scancrashed', value='401')
return
if not self.should_cancel() and self.plex_type == v.PLEX_TYPE_MOVIE:
# Check for collections/sets
collections = False
for child in item['xml'][0]:
if child.tag == 'Collection':
collections = True
break
if collections:
global LOCK
with LOCK:
self._collections(item)
if not self.should_cancel() and self.get_children:
children_xml = PF.GetAllPlexChildren(self.plex_id)
try: try:
children_xml[0].attrib if item is None or self.should_cancel():
except (TypeError, IndexError, AttributeError): self._process_abort(item[0] if item else None,
LOG.error('Could not get children for Plex id %s', item[2] if item else None)
self.plex_id) break
else: count, plex_id, section = item
item['children'] = children_xml item = {
if not self.should_cancel(): 'xml': PF.GetPlexMetadata(plex_id), # This will block
self.queue.put((self.count, item)) 'children': None,
'section': section
}
if item['xml'] is None:
# Did not receive a valid XML - skip that item for now
LOG.error("Could not get metadata for %s. Skipping item "
"for now", plex_id)
self._process_skipped_item(count, section)
continue
elif item['xml'] == 401:
LOG.error('HTTP 401 returned by PMS. Too much strain? '
'Cancelling sync for now')
utils.window('plex_scancrashed', value='401')
self._process_abort(count, section)
break
if section.plex_type == v.PLEX_TYPE_MOVIE:
# Check for collections/sets
collections = False
for child in item['xml'][0]:
if child.tag == 'Collection':
collections = True
break
if collections:
with LOCK:
self._collections(item)
if section.get_children:
if self.should_cancel():
self._process_abort(count, section)
break
children_xml = PF.GetAllPlexChildren(plex_id) # Will block
try:
children_xml[0].attrib
except (TypeError, IndexError, AttributeError):
LOG.error('Could not get children for Plex id %s',
plex_id)
self._process_skipped_item(count, section)
continue
else:
item['children'] = children_xml
self.processing_queue.put((count, item))
finally:
self.get_metadata_queue.task_done()

View file

@ -0,0 +1,104 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, unicode_literals
from logging import getLogger
from . import common, sections
from ..plex_db import PlexDB
from .. import backgroundthread, app
LOG = getLogger('PLEX.sync.process_metadata')
COMMIT_TO_DB_EVERY_X_ITEMS = 500
class ProcessMetadataThread(common.LibrarySyncMixin,
backgroundthread.KillableThread):
"""
Invoke once in order to process the received PMS metadata xmls
"""
def __init__(self, current_time, processing_queue, update_progressbar):
self.current_time = current_time
self.processing_queue = processing_queue
self.update_progressbar = update_progressbar
self.last_section = sections.Section()
self.successful = True
super(ProcessMetadataThread, self).__init__()
def start_section(self, section):
if section != self.last_section:
if self.last_section:
self.finish_last_section()
LOG.debug('Start or continue processing section %s', section)
self.last_section = section
# Warn the user for this new section if we cannot access a file
app.SYNC.path_verified = False
else:
LOG.debug('Resume processing section %s', section)
def finish_last_section(self):
if (not self.should_cancel() and self.last_section and
self.last_section.sync_successful):
# Check for should_cancel() because we cannot be sure that we
# processed every item of the section
with PlexDB() as plexdb:
# Set the new time mark for the next delta sync
plexdb.update_section_last_sync(self.last_section.section_id,
self.current_time)
LOG.info('Finished processing section successfully: %s',
self.last_section)
elif self.last_section and not self.last_section.sync_successful:
LOG.warn('Sync not successful for section %s', self.last_section)
self.successful = False
def _get(self):
item = {'xml': None}
while not self.should_cancel() and item and item['xml'] is None:
item = self.processing_queue.get()
self.processing_queue.task_done()
return item
def run(self):
LOG.debug('Starting %s thread', self.__class__.__name__)
app.APP.register_thread(self)
try:
self._run()
except Exception:
from .. import utils
utils.ERROR(notify=True)
finally:
app.APP.deregister_thread(self)
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
def _run(self):
# There are 2 sentinels: None for aborting/ending this thread, the dict
# {'section': section, 'xml': None} for skipped/invalid items
item = self._get()
if item:
section = item['section']
processed = 0
self.start_section(section)
while not self.should_cancel():
if item is None:
break
elif item['section'] != section:
# We received an entirely new section
self.start_section(item['section'])
section = item['section']
with section.context(self.current_time) as context:
while not self.should_cancel():
if item is None or item['section'] != section:
break
self.update_progressbar(section,
item['xml'][0].get('title'),
section.count)
context.add_update(item['xml'][0],
section_name=section.name,
section_id=section.section_id,
children=item['children'])
processed += 1
section.count += 1
if processed == COMMIT_TO_DB_EVERY_X_ITEMS:
processed = 0
context.commit()
item = self._get()
self.finish_last_section()

View file

@ -54,6 +54,8 @@ class Section(object):
self.content = None # unicode self.content = None # unicode
# Setting the section_type WILL re_set sync_to_kodi! # Setting the section_type WILL re_set sync_to_kodi!
self._section_type = None # unicode self._section_type = None # unicode
# E.g. "season" or "movie" (translated)
self.section_type_text = None
# Do we sync all items of this section to the Kodi DB? # Do we sync all items of this section to the Kodi DB?
# This will be set with section_type!! # This will be set with section_type!!
self.sync_to_kodi = None # bool self.sync_to_kodi = None # bool
@ -77,13 +79,9 @@ class Section(object):
self.order = None self.order = None
# Original PMS xml for this section, including children # Original PMS xml for this section, including children
self.xml = None self.xml = None
# Attributes that will be initialized later by full_sync.py
self.iterator = None
self.context = None
self.get_children = None
# A section_type encompasses possible several plex_types! E.g. shows # A section_type encompasses possible several plex_types! E.g. shows
# contain shows, seasons, episodes # contain shows, seasons, episodes
self.plex_type = None self._plex_type = None
if xml_element is not None: if xml_element is not None:
self.from_xml(xml_element) self.from_xml(xml_element)
elif section_db_element: elif section_db_element:
@ -106,9 +104,14 @@ class Section(object):
self.section_type is not None) self.section_type is not None)
def __eq__(self, section): def __eq__(self, section):
"""
Sections compare equal if their section_id, name and plex_type (first
prio) OR section_type (if there is no plex_type is set) compare equal
"""
return (self.section_id == section.section_id and return (self.section_id == section.section_id and
self.name == section.name and self.name == section.name and
self.section_type == section.section_type) (self.plex_type == section.plex_type if self.plex_type else
self.section_type == section.section_type))
def __ne__(self, section): def __ne__(self, section):
return not self == section return not self == section
@ -140,6 +143,15 @@ class Section(object):
else: else:
self.sync_to_kodi = True self.sync_to_kodi = True
@property
def plex_type(self):
return self._plex_type
@plex_type.setter
def plex_type(self, value):
self._plex_type = value
self.section_type_text = utils.lang(v.TRANSLATION_FROM_PLEXTYPE[value])
@property @property
def index(self): def index(self):
return self._index return self._index
@ -431,6 +443,39 @@ class Section(object):
self.remove_from_plex() self.remove_from_plex()
def _get_children(plex_type):
if plex_type == v.PLEX_TYPE_ALBUM:
return True
else:
return False
def get_sync_section(section, plex_type):
"""
Deep-copies section and adds certain arguments in order to prep section
for the library sync
"""
section = copy.deepcopy(section)
section.plex_type = plex_type
section.context = itemtypes.ITEMTYPE_FROM_PLEXTYPE[plex_type]
section.get_children = _get_children(plex_type)
# Some more init stuff
# Has sync for this section been successful?
section.sync_successful = True
# List of tuples: (collection index [as in an item's metadata with
# "Collection id"], collection plex id)
section.collection_match = None
# Dict with entries of the form <collection index>: <collection xml>
section.collection_xmls = {}
# Keep count during sync
section.count = 0
# Total number of items that we need to sync
section.number_of_items = 0
# Iterator to get one sync item after the other
section.iterator = None
return section
def force_full_sync(): def force_full_sync():
""" """
Resets the sync timestamp for all sections to 0, thus forcing a subsequent Resets the sync timestamp for all sections to 0, thus forcing a subsequent

View file

@ -20,18 +20,19 @@ SUPPORTED_KODI_TYPES = (
class PlexDBBase(object): class PlexDBBase(object):
""" """
Plex database methods used for all types of items Plex database methods used for all types of items.
""" """
def __init__(self, plexconn=None, lock=True): def __init__(self, plexconn=None, lock=True, copy=False):
# Allows us to use this class with a cursor instead of context mgr # Allows us to use this class with a cursor instead of context mgr
self.plexconn = plexconn self.plexconn = plexconn
self.cursor = self.plexconn.cursor() if self.plexconn else None self.cursor = self.plexconn.cursor() if self.plexconn else None
self.lock = lock self.lock = lock
self.copy = copy
def __enter__(self): def __enter__(self):
if self.lock: if self.lock:
PLEXDB_LOCK.acquire() PLEXDB_LOCK.acquire()
self.plexconn = db.connect('plex') self.plexconn = db.connect('plex-copy' if self.copy else 'plex')
self.cursor = self.plexconn.cursor() self.cursor = self.plexconn.cursor()
return self return self

View file

@ -127,6 +127,7 @@ DB_MUSIC_PATH = None
DB_TEXTURE_VERSION = None DB_TEXTURE_VERSION = None
DB_TEXTURE_PATH = None DB_TEXTURE_PATH = None
DB_PLEX_PATH = try_decode(xbmc.translatePath("special://database/plex.db")) DB_PLEX_PATH = try_decode(xbmc.translatePath("special://database/plex.db"))
DB_PLEX_COPY_PATH = try_decode(xbmc.translatePath("special://database/plex-copy.db"))
EXTERNAL_SUBTITLE_TEMP_PATH = try_decode(xbmc.translatePath( EXTERNAL_SUBTITLE_TEMP_PATH = try_decode(xbmc.translatePath(
"special://profile/addon_data/%s/temp/" % ADDON_ID)) "special://profile/addon_data/%s/temp/" % ADDON_ID))