diff --git a/resources/lib/backgroundthread.py b/resources/lib/backgroundthread.py index 45f7602f..682dcd8f 100644 --- a/resources/lib/backgroundthread.py +++ b/resources/lib/backgroundthread.py @@ -2,12 +2,15 @@ # -*- coding: utf-8 -*- from __future__ import absolute_import, division, unicode_literals from logging import getLogger +from time import time as _time import threading import Queue import heapq +from collections import deque + import xbmc -from . import utils, app +from . import utils, app, variables as v LOG = getLogger('PLEX.threads') @@ -36,8 +39,8 @@ class KillableThread(threading.Thread): """ self._canceled = True # Make sure thread is running in order to exit quickly - self._is_not_suspended.set() self._is_not_asleep.set() + self._is_not_suspended.set() def should_suspend(self): """ @@ -66,8 +69,8 @@ class KillableThread(threading.Thread): back to life """ self._suspended = False - self._is_not_suspended.set() self._is_not_asleep.set() + self._is_not_suspended.set() def wait_while_suspended(self): """ @@ -104,6 +107,166 @@ class KillableThread(threading.Thread): return not self._is_not_asleep.is_set() +class ProcessingQueue(Queue.Queue, object): + """ + Queue of queues that processes a queue completely before moving on to the + next queue. There's one queue per Section(). You need to initialize each + section with add_section(section) first. + Put tuples (count, item) into this queue, with count being the respective + position of the item in the queue, starting with 0 (zero). + (None, None) is the sentinel for a single queue being exhausted, added by + put_sentinel() + """ + def _init(self, maxsize): + self.queue = deque() + self._sections = deque() + self._queues = deque() + self._current_section = None + self._current_queue = None + self._counter = 0 + + def _qsize(self): + return self._current_queue._qsize() if self._current_queue else 0 + + def total_size(self): + """ + Return the approximate total size of all queues (not reliable!) + """ + self.mutex.acquire() + n = sum(q._qsize() for q in self._queues) if self._queues else 0 + self.mutex.release() + return n + + def put(self, item, block=True, timeout=None): + """Put an item into the queue. + + If optional args 'block' is true and 'timeout' is None (the default), + block if necessary until a free slot is available. If 'timeout' is + a non-negative number, it blocks at most 'timeout' seconds and raises + the Full exception if no free slot was available within that time. + Otherwise ('block' is false), put an item on the queue if a free slot + is immediately available, else raise the Full exception ('timeout' + is ignored in that case). + """ + self.not_full.acquire() + try: + if self.maxsize > 0: + if not block: + # Use >= instead of == due to OrderedQueue! + if self._qsize() >= self.maxsize: + raise Queue.Full + elif timeout is None: + while self._qsize() >= self.maxsize: + self.not_full.wait() + elif timeout < 0: + raise ValueError("'timeout' must be a non-negative number") + else: + endtime = _time() + timeout + while self._qsize() >= self.maxsize: + remaining = endtime - _time() + if remaining <= 0.0: + raise Queue.Full + self.not_full.wait(remaining) + if self._put(item) == 0: + # Only notify one waiting thread if this item is put into the + # current queue + self.not_empty.notify() + else: + # Be sure to signal not_empty only once! + self._unlock_after_section_change() + self.unfinished_tasks += 1 + finally: + self.not_full.release() + + def _put(self, item): + """ + Returns the index of the section in whose subqueue we need to put the + item into + """ + for i, section in enumerate(self._sections): + if item[1]['section'] == section: + self._queues[i]._put(item) + break + else: + raise RuntimeError('Could not find section for item %s' % item[1]) + return i + + def _unlock_after_section_change(self): + """ + Ugly work-around if we expected more items to be synced, but we had + to lower our section.number_of_items because PKC decided that nothing + changed and we don't need to sync the respective item(s). + get() thus might block indefinitely + """ + while (self._current_section and + self._counter == self._current_section.number_of_items): + LOG.debug('Signaling completion of current section') + self._init_next_section() + if self._current_queue and self._current_queue._qsize(): + LOG.debug('Signaling not_empty') + self.not_empty.notify() + + def put_sentinel(self, section): + """ + Adds a new empty section as a sentinel. Call with an empty Section() + object. + Once the get()-method returns None, you've received the sentinel and + you've thus exhausted the queue + """ + self.not_empty.acquire() + try: + section.number_of_items = 1 + self._add_section(section) + # Add the actual sentinel to the queue we just added + self._queues[-1]._put((None, None)) + self.unfinished_tasks += 1 + if len(self._queues) == 1: + # queue was already exhausted! + self._switch_queues() + self._counter = 0 + self.not_empty.notify() + else: + self._unlock_after_section_change() + finally: + self.not_empty.release() + + def add_section(self, section): + """ + Be sure to add all sections first before starting to pop items off this + queue or adding them to the queue + """ + self.mutex.acquire() + try: + self._add_section(section) + finally: + self.mutex.release() + + def _add_section(self, section): + self._sections.append(section) + self._queues.append( + OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM + else Queue.Queue()) + if self._current_section is None: + self._switch_queues() + + def _init_next_section(self): + self._sections.popleft() + self._queues.popleft() + self._counter = 0 + self._switch_queues() + + def _switch_queues(self): + self._current_section = self._sections[0] if self._sections else None + self._current_queue = self._queues[0] if self._queues else None + + def _get(self): + item = self._current_queue._get() + self._counter += 1 + if self._counter == self._current_section.number_of_items: + self._init_next_section() + return item[1] + + class OrderedQueue(Queue.PriorityQueue, object): """ Queue that enforces an order on the items it returns. An item you push @@ -111,58 +274,21 @@ class OrderedQueue(Queue.PriorityQueue, object): (index, item) where index=-1 is the item that will be returned first. The Queue will block until index=-1, 0, 1, 2, 3, ... is then made available + + maxsize will be rather fuzzy, as _qsize returns 0 if we're still waiting + for the next smalles index. put() thus might not block always when it + should. """ def __init__(self, maxsize=0): + self.next_index = 0 super(OrderedQueue, self).__init__(maxsize) - self.smallest = -1 - self.not_next_item = threading.Condition(self.mutex) - def _put(self, item, heappush=heapq.heappush): - heappush(self.queue, item) - if item[0] == self.smallest: - self.not_next_item.notify() + def _qsize(self, len=len): + return len(self.queue) if self.queue[0][0] == self.next_index else 0 - def get(self, block=True, timeout=None): - """Remove and return an item from the queue. - - If optional args 'block' is true and 'timeout' is None (the default), - block if necessary until an item is available. If 'timeout' is - a non-negative number, it blocks at most 'timeout' seconds and raises - the Empty exception if no item was available within that time. - Otherwise ('block' is false), return an item if one is immediately - available, else raise the Empty exception ('timeout' is ignored - in that case). - """ - self.not_empty.acquire() - try: - if not block: - if not self._qsize() or self.queue[0][0] != self.smallest: - raise Queue.Empty - elif timeout is None: - while not self._qsize(): - self.not_empty.wait() - while self.queue[0][0] != self.smallest: - self.not_next_item.wait() - elif timeout < 0: - raise ValueError("'timeout' must be a non-negative number") - else: - endtime = Queue._time() + timeout - while not self._qsize(): - remaining = endtime - Queue._time() - if remaining <= 0.0: - raise Queue.Empty - self.not_empty.wait(remaining) - while self.queue[0][0] != self.smallest: - remaining = endtime - Queue._time() - if remaining <= 0.0: - raise Queue.Empty - self.not_next_item.wait(remaining) - item = self._get() - self.smallest += 1 - self.not_full.notify() - return item - finally: - self.not_empty.release() + def _get(self, heappop=heapq.heappop): + self.next_index += 1 + return heappop(self.queue) class Tasks(list): diff --git a/resources/lib/library_sync/common.py b/resources/lib/library_sync/common.py index c2481956..d6aae66f 100644 --- a/resources/lib/library_sync/common.py +++ b/resources/lib/library_sync/common.py @@ -9,6 +9,20 @@ PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and utils.settings('enablePlaylistSync') == 'true') +class LibrarySyncMixin(object): + def suspend(self, block=False, timeout=None): + """ + Let's NOT suspend sync threads but immediately terminate them + """ + self.cancel() + + def wait_while_suspended(self): + """ + Return immediately + """ + return self.should_cancel() + + def update_kodi_library(video=True, music=True): """ Updates the Kodi library and thus refreshes the Kodi views and widgets diff --git a/resources/lib/library_sync/fill_metadata_queue.py b/resources/lib/library_sync/fill_metadata_queue.py new file mode 100644 index 00000000..c92f7a96 --- /dev/null +++ b/resources/lib/library_sync/fill_metadata_queue.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, division, unicode_literals +from logging import getLogger +import Queue +from collections import deque + +from . import common +from ..plex_db import PlexDB +from .. import backgroundthread, app + +LOG = getLogger('PLEX.sync.fill_metadata_queue') + + +def batch_sizes(): + """ + Increase batch sizes in order to get download threads for an items xml + metadata started soon. Corresponds to batch sizes when downloading lists + of items from the PMS ('limitindex' in the PKC settings) + """ + for i in (50, 100, 200, 400): + yield i + while True: + yield 1000 + + +class FillMetadataQueue(common.LibrarySyncMixin, + backgroundthread.KillableThread, ): + """ + Threaded download of Plex XML metadata for a certain library item. + Fills the queue with the downloaded etree XML objects + + Input: + queue Queue.Queue() object where this thread will store + the downloaded metadata XMLs as etree objects + """ + def __init__(self, repair, section_queue, get_metadata_queue): + self.repair = repair + self.section_queue = section_queue + self.get_metadata_queue = get_metadata_queue + self.count = 0 + self.batch_size = batch_sizes() + super(FillMetadataQueue, self).__init__() + + def _loop(self, section, items): + while items and not self.should_cancel(): + try: + with PlexDB(lock=False) as plexdb: + while items and not self.should_cancel(): + last, plex_id, checksum = items.popleft() + if (not self.repair and + plexdb.checksum(plex_id, section.plex_type) == checksum): + continue + if last: + # We might have received LESS items from the PMS + # than anticipated. Ensures that our queues finish + section.number_of_items = self.count + 1 + self.get_metadata_queue.put((self.count, plex_id, section), + block=False) + self.count += 1 + except Queue.Full: + # Close the DB for speed! + LOG.debug('Queue full') + self.sleep(5) + while not self.should_cancel(): + try: + self.get_metadata_queue.put((self.count, plex_id, section), + block=False) + except Queue.Full: + LOG.debug('Queue fuller') + self.sleep(2) + else: + self.count += 1 + break + + def _process_section(self, section): + # Initialize only once to avoid loosing the last value before we're + # breaking the for loop + iterator = common.tag_last(section.iterator) + last = True + self.count = 0 + while not self.should_cancel(): + batch_size = next(self.batch_size) + LOG.debug('Process batch of size %s with count %s for section %s', + batch_size, self.count, section) + # Iterator will block for download - let's not do that when the + # DB connection is open + items = deque() + for i, (last, xml) in enumerate(iterator): + plex_id = int(xml.get('ratingKey')) + checksum = int('{}{}'.format( + plex_id, + xml.get('updatedAt', + xml.get('addedAt', '1541572987')))) + items.append((last, plex_id, checksum)) + if i == batch_size: + break + self._loop(section, items) + if last: + break + + def run(self): + LOG.debug('Starting %s thread', self.__class__.__name__) + app.APP.register_thread(self) + try: + while not self.should_cancel(): + section = self.section_queue.get() + self.section_queue.task_done() + if section is None: + break + self._process_section(section) + except Exception: + from .. import utils + utils.ERROR(notify=True) + finally: + # Signal the download metadata threads to stop with a sentinel + self.get_metadata_queue.put(None) + app.APP.deregister_thread(self) + LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__) diff --git a/resources/lib/library_sync/full_sync.py b/resources/lib/library_sync/full_sync.py index 609d45b6..87c6bafd 100644 --- a/resources/lib/library_sync/full_sync.py +++ b/resources/lib/library_sync/full_sync.py @@ -3,15 +3,15 @@ from __future__ import absolute_import, division, unicode_literals from logging import getLogger import Queue -import copy import xbmcgui -from .get_metadata import GetMetadataTask, reset_collections +from .get_metadata import GetMetadataThread +from .fill_metadata_queue import FillMetadataQueue +from .process_metadata import ProcessMetadataThread from . import common, sections from .. import utils, timing, backgroundthread, variables as v, app from .. import plex_functions as PF, itemtypes -from ..plex_db import PlexDB if common.PLAYLIST_SYNC_ENABLED: from .. import playlists @@ -19,222 +19,107 @@ if common.PLAYLIST_SYNC_ENABLED: LOG = getLogger('PLEX.sync.full_sync') # How many items will be put through the processing chain at once? -BATCH_SIZE = 2000 +BATCH_SIZE = 250 +# Size of queue for xmls to be downloaded from PMS for/and before processing +QUEUE_BUFFER = 50 +# Max number of xmls held in memory +MAX_QUEUE_SIZE = 500 # Safety margin to filter PMS items - how many seconds to look into the past? UPDATED_AT_SAFETY = 60 * 5 LAST_VIEWED_AT_SAFETY = 60 * 5 -class InitNewSection(object): - """ - Throw this into the queue used for ProcessMetadata to tell it which - Plex library section we're looking at - """ - def __init__(self, context, total_number_of_items, section_name, - section_id, plex_type): - self.context = context - self.total = total_number_of_items - self.name = section_name - self.id = section_id - self.plex_type = plex_type - - -class FullSync(backgroundthread.KillableThread): +class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread): def __init__(self, repair, callback, show_dialog): """ repair=True: force sync EVERY item """ self.repair = repair self.callback = callback - self.queue = None - self.process_thread = None - self.current_sync = None - self.plexdb = None - self.plex_type = None - self.section_type = None - self.worker_count = int(utils.settings('syncThreadNumber')) - self.item_count = 0 # For progress dialog self.show_dialog = show_dialog self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true' - self.dialog = None - self.total = 0 - self.current = 0 - self.processed = 0 - self.title = '' - self.section = None - self.section_name = None - self.section_type_text = None - self.context = None - self.get_children = None - self.successful = None - self.section_success = None + if self.show_dialog: + self.dialog = xbmcgui.DialogProgressBG() + self.dialog.create(utils.lang(39714)) + else: + self.dialog = None + + self.section_queue = Queue.Queue() + self.get_metadata_queue = Queue.Queue(maxsize=5000) + self.processing_queue = backgroundthread.ProcessingQueue(maxsize=500) + self.current_time = timing.plex_now() + self.last_section = sections.Section() + + self.successful = True self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true' - self.threader = backgroundthread.ThreaderManager( - worker=backgroundthread.NonstoppingBackgroundWorker, - worker_count=self.worker_count) + self.threads = [ + GetMetadataThread(self.get_metadata_queue, self.processing_queue) + for _ in range(int(utils.settings('syncThreadNumber'))) + ] + for t in self.threads: + t.start() super(FullSync, self).__init__() - def suspend(self, block=False, timeout=None): - """ - Let's NOT suspend sync threads but immediately terminate them - """ - self.cancel() - - def update_progressbar(self): - if self.dialog: - try: - progress = int(float(self.current) / float(self.total) * 100.0) - except ZeroDivisionError: - progress = 0 - self.dialog.update(progress, - '%s (%s)' % (self.section_name, self.section_type_text), - '%s %s/%s' - % (self.title, self.current, self.total)) - if app.APP.is_playing_video: - self.dialog.close() - self.dialog = None - - def process_item(self, xml_item): - """ - Processes a single library item - """ - plex_id = int(xml_item.get('ratingKey')) - if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \ - int('%s%s' % (plex_id, - xml_item.get('updatedAt', - xml_item.get('addedAt', 1541572987)))): + def update_progressbar(self, section, title, current): + if not self.dialog: return - self.threader.addTask(GetMetadataTask(self.queue, - plex_id, - self.plex_type, - self.get_children, - self.item_count)) - self.item_count += 1 - - def update_library(self): - LOG.debug('Writing changes to Kodi library now') - i = 0 - if not self.section: - _, self.section = self.queue.get() - self.queue.task_done() - while not self.should_cancel() and self.item_count > 0: - section = self.section - if not section: - break - LOG.debug('Start or continue processing section %s (%ss)', - section.name, section.plex_type) - self.processed = 0 - self.total = section.total - self.section_name = section.name - self.section_type_text = utils.lang( - v.TRANSLATION_FROM_PLEXTYPE[section.plex_type]) - with section.context(self.current_sync) as context: - while not self.should_cancel() and self.item_count > 0: - try: - _, item = self.queue.get(block=False) - except backgroundthread.Queue.Empty: - if self.threader.threader.working(): - self.sleep(0.02) - continue - else: - # Try again, in case a thread just finished - i += 1 - if i == 3: - break - continue - i = 0 - self.queue.task_done() - if isinstance(item, dict): - context.add_update(item['xml'][0], - section_name=section.name, - section_id=section.id, - children=item['children']) - self.title = item['xml'][0].get('title') - self.processed += 1 - elif isinstance(item, InitNewSection) or item is None: - self.section = item - break - else: - raise ValueError('Unknown type %s' % type(item)) - self.item_count -= 1 - self.current += 1 - self.update_progressbar() - if self.processed == 500: - self.processed = 0 - context.commit() - LOG.debug('Done writing changes to Kodi library') - - @utils.log_time - def addupdate_section(self, section): - LOG.debug('Processing library section for new or changed items %s', - section) - if not self.install_sync_done: - app.SYNC.path_verified = False + current += 1 try: - # Sync new, updated and deleted items - iterator = section.iterator - # Tell the processing thread about this new section - queue_info = InitNewSection(section.context, - iterator.total, - iterator.get('librarySectionTitle', - iterator.get('title1')), - section.section_id, - section.plex_type) - self.queue.put((-1, queue_info)) - last = True - # To keep track of the item-number in order to kill while loops - self.item_count = 0 - self.current = 0 - # Initialize only once to avoid loosing the last value before - # we're breaking the for loop - loop = common.tag_last(iterator) - while True: - # Check Plex DB to see what we need to add/update - with PlexDB() as self.plexdb: - for last, xml_item in loop: - if self.should_cancel(): - return False - self.process_item(xml_item) - if self.item_count == BATCH_SIZE: - break - # Make sure Plex DB above is closed before adding/updating! - self.update_library() - if last: - break - reset_collections() - return True - except RuntimeError: - LOG.error('Could not entirely process section %s', section) - return False + progress = int(float(current) / float(section.number_of_items) * 100.0) + except ZeroDivisionError: + progress = 0 + self.dialog.update(progress, + '%s (%s)' % (section.name, section.section_type_text), + '%s %s/%s' + % (title, current, section.number_of_items)) + if app.APP.is_playing_video: + self.dialog.close() + self.dialog = None @utils.log_time + def processing_loop_new_and_changed_items(self): + LOG.debug('Start working') + scanner_thread = FillMetadataQueue(self.repair, + self.section_queue, + self.get_metadata_queue) + scanner_thread.start() + process_thread = ProcessMetadataThread(self.current_time, + self.processing_queue, + self.update_progressbar) + process_thread.start() + LOG.debug('Waiting for scanner thread to finish up') + scanner_thread.join() + LOG.debug('Waiting for metadata download threads to finish up') + for t in self.threads: + t.join() + LOG.debug('Download metadata threads finished') + # Sentinel for the process_thread once we added everything else + self.processing_queue.put_sentinel(sections.Section()) + process_thread.join() + self.successful = process_thread.successful + LOG.debug('threads finished work. successful: %s', self.successful) + + @utils.log_time + def processing_loop_playstates(self): + while not self.should_cancel(): + section = self.section_queue.get() + self.section_queue.task_done() + if section is None: + break + self.playstate_per_section(section) + def playstate_per_section(self, section): LOG.debug('Processing %s playstates for library section %s', - section.iterator.total, section) + section.number_of_items, section) try: - # Sync new, updated and deleted items iterator = section.iterator - # Tell the processing thread about this new section - queue_info = InitNewSection(section.context, - iterator.total, - section.name, - section.section_id, - section.plex_type) - self.queue.put((-1, queue_info)) - self.total = iterator.total - self.section_name = section.name - self.section_type_text = utils.lang( - v.TRANSLATION_FROM_PLEXTYPE[section.plex_type]) - self.current = 0 - + iterator = common.tag_last(iterator) last = True - loop = common.tag_last(iterator) - while True: - with section.context(self.current_sync) as itemtype: - for i, (last, xml_item) in enumerate(loop): - if self.should_cancel(): - return False + while not self.should_cancel(): + with section.context(self.current_time) as itemtype: + for last, xml_item in iterator: + section.count += 1 if not itemtype.update_userdata(xml_item, section.plex_type): # Somehow did not sync this item yet itemtype.add_update(xml_item, @@ -242,22 +127,21 @@ class FullSync(backgroundthread.KillableThread): section_id=section.section_id) itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']), section.plex_type, - self.current_sync) - self.current += 1 - self.update_progressbar() - if (i + 1) % (10 * BATCH_SIZE) == 0: + self.current_time) + self.update_progressbar(section, '', section.count) + if section.count % (10 * BATCH_SIZE) == 0: break if last: break - return True except RuntimeError: LOG.error('Could not entirely process section %s', section) - return False + self.successful = False - def threaded_get_iterators(self, kinds, queue, all_items=False): + def threaded_get_iterators(self, kinds, queue, all_items): """ Getting iterators is costly, so let's do it asynchronously """ + LOG.debug('Start threaded_get_iterators') try: for kind in kinds: for section in (x for x in app.SYNC.sections @@ -268,86 +152,58 @@ class FullSync(backgroundthread.KillableThread): if not section.sync_to_kodi: LOG.info('User chose to not sync section %s', section) continue - element = copy.deepcopy(section) - element.plex_type = kind[0] - element.section_type = element.plex_type - element.context = kind[2] - element.get_children = kind[3] - element.Queue = kind[4] + section = sections.get_sync_section(section, + plex_type=kind[0]) if self.repair or all_items: updated_at = None else: updated_at = section.last_sync - UPDATED_AT_SAFETY \ if section.last_sync else None try: - element.iterator = PF.get_section_iterator( + section.iterator = PF.get_section_iterator( section.section_id, - plex_type=element.plex_type, + plex_type=section.plex_type, updated_at=updated_at, last_viewed_at=None) except RuntimeError: - LOG.warn('Sync at least partially unsuccessful') - self.successful = False - self.section_success = False + LOG.error('Sync at least partially unsuccessful!') + LOG.error('Error getting section iterator %s', section) else: - queue.put(element) + section.number_of_items = section.iterator.total + if section.number_of_items > 0: + self.processing_queue.add_section(section) + queue.put(section) + LOG.debug('Put section in queue with %s items: %s', + section.number_of_items, section) except Exception: utils.ERROR(notify=True) finally: queue.put(None) + LOG.debug('Exiting threaded_get_iterators') def full_library_sync(self): - """ - """ - # structure: - # (plex_type, - # section_type, - # context for itemtype, - # download children items, e.g. songs for a specific album?, - # Queue) kinds = [ - (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE, itemtypes.Movie, False, Queue.Queue), - (v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW, itemtypes.Show, False, Queue.Queue), - (v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW, itemtypes.Season, False, Queue.Queue), - (v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW, itemtypes.Episode, False, Queue.Queue) + (v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE), + (v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW), + (v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW), + (v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW) ] if app.SYNC.enable_music: kinds.extend([ - (v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False, Queue.Queue), - (v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, backgroundthread.OrderedQueue), + (v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST), + (v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST), ]) # ADD NEW ITEMS # Already start setting up the iterators. We need to enforce # syncing e.g. show before season before episode - iterator_queue = Queue.Queue() - task = backgroundthread.FunctionAsTask(self.threaded_get_iterators, - None, - kinds, - iterator_queue) - backgroundthread.BGThreader.addTask(task) - while True: - self.section_success = True - section = iterator_queue.get() - iterator_queue.task_done() - if section is None: - break - # Setup our variables - self.plex_type = section.plex_type - self.section_type = section.section_type - self.context = section.context - self.get_children = section.get_children - self.queue = section.Queue() - # Now do the heavy lifting - if self.should_cancel() or not self.addupdate_section(section): - return False - if self.section_success: - # Need to check because a thread might have missed to get - # some items from the PMS - with PlexDB() as plexdb: - # Set the new time mark for the next delta sync - plexdb.update_section_last_sync(section.section_id, - self.current_sync) + backgroundthread.KillableThread( + target=self.threaded_get_iterators, + args=(kinds, self.section_queue, False)).start() + # Do the heavy lifting + self.processing_loop_new_and_changed_items() common.update_kodi_library(video=True, music=True) + if self.should_cancel() or not self.successful: + return # Sync Plex playlists to Kodi and vice-versa if common.PLAYLIST_SYNC_ENABLED: @@ -357,48 +213,29 @@ class FullSync(backgroundthread.KillableThread): self.dialog = xbmcgui.DialogProgressBG() # "Synching playlists" self.dialog.create(utils.lang(39715)) - if not playlists.full_sync(): - return False + if not playlists.full_sync() or self.should_cancel(): + return # SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that # were set to unwatched). Also mark all items on the PMS to be able # to delete the ones still in Kodi - LOG.info('Start synching playstate and userdata for every item') - # In order to not delete all your songs again + LOG.debug('Start synching playstate and userdata for every item') if app.SYNC.enable_music: - # We don't need to enforce the album order now - kinds.pop(5) + # In order to not delete all your songs again kinds.extend([ - (v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, Queue.Queue), - (v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST, itemtypes.Song, True, Queue.Queue), + (v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST), ]) # Make sure we're not showing an item's title in the sync dialog - self.title = '' - self.threader.shutdown() - self.threader = None if not self.show_dialog_userdata and self.dialog: # Close the progress indicator dialog self.dialog.close() self.dialog = None - task = backgroundthread.FunctionAsTask(self.threaded_get_iterators, - None, - kinds, - iterator_queue, - all_items=True) - backgroundthread.BGThreader.addTask(task) - while True: - section = iterator_queue.get() - iterator_queue.task_done() - if section is None: - break - # Setup our variables - self.plex_type = section.plex_type - self.section_type = section.section_type - self.context = section.context - self.get_children = section.get_children - # Now do the heavy lifting - if self.should_cancel() or not self.playstate_per_section(section): - return False + backgroundthread.KillableThread( + target=self.threaded_get_iterators, + args=(kinds, self.section_queue, True)).start() + self.processing_loop_playstates() + if self.should_cancel() or not self.successful: + return # Delete movies that are not on Plex anymore LOG.debug('Looking for items to delete') @@ -417,60 +254,49 @@ class FullSync(backgroundthread.KillableThread): for plex_type, context in kinds: # Delete movies that are not on Plex anymore while True: - with context(self.current_sync) as ctx: - plex_ids = list(ctx.plexdb.plex_id_by_last_sync(plex_type, - self.current_sync, - BATCH_SIZE)) + with context(self.current_time) as ctx: + plex_ids = list( + ctx.plexdb.plex_id_by_last_sync(plex_type, + self.current_time, + BATCH_SIZE)) for plex_id in plex_ids: if self.should_cancel(): - return False + return ctx.remove(plex_id, plex_type) if len(plex_ids) < BATCH_SIZE: break - LOG.debug('Done deleting') - return True + LOG.debug('Done looking for items to delete') def run(self): app.APP.register_thread(self) + LOG.info('Running library sync with repair=%s', self.repair) try: - self._run() + self.run_full_library_sync() finally: app.APP.deregister_thread(self) - LOG.info('Done full_sync') + LOG.info('Library sync done. successful: %s', self.successful) @utils.log_time - def _run(self): - self.current_sync = timing.plex_now() - # Get latest Plex libraries and build playlist and video node files - if self.should_cancel() or not sections.sync_from_pms(self): - return - self.successful = True + def run_full_library_sync(self): try: - if self.show_dialog: - self.dialog = xbmcgui.DialogProgressBG() - self.dialog.create(utils.lang(39714)) - - # Actual syncing - do only new items first - LOG.info('Running full_library_sync with repair=%s', - self.repair) - if self.should_cancel() or not self.full_library_sync(): + # Get latest Plex libraries and build playlist and video node files + if self.should_cancel() or not sections.sync_from_pms(self): + return + if self.should_cancel(): self.successful = False return + self.full_library_sync() finally: common.update_kodi_library(video=True, music=True) if self.dialog: self.dialog.close() - if self.threader: - self.threader.shutdown() - self.threader = None if not self.successful and not self.should_cancel(): # "ERROR in library sync" utils.dialog('notification', heading='{plex}', message=utils.lang(39410), icon='{error}') - if self.callback: - self.callback(self.successful) + self.callback(self.successful) def start(show_dialog, repair=False, callback=None): diff --git a/resources/lib/library_sync/get_metadata.py b/resources/lib/library_sync/get_metadata.py index 427169e2..972e222c 100644 --- a/resources/lib/library_sync/get_metadata.py +++ b/resources/lib/library_sync/get_metadata.py @@ -2,73 +2,46 @@ from __future__ import absolute_import, division, unicode_literals from logging import getLogger +from . import common from ..plex_api import API -from .. import plex_functions as PF, backgroundthread, utils, variables as v - - -LOG = getLogger("PLEX." + __name__) +from .. import backgroundthread, plex_functions as PF, utils, variables as v +from .. import app +LOG = getLogger('PLEX.sync.get_metadata') LOCK = backgroundthread.threading.Lock() -# List of tuples: (collection index [as in an item's metadata with "Collection -# id"], collection plex id) -COLLECTION_MATCH = None -# Dict with entries of the form : -COLLECTION_XMLS = {} -def reset_collections(): - """ - Collections seem unique to Plex sections - """ - global LOCK, COLLECTION_MATCH, COLLECTION_XMLS - with LOCK: - COLLECTION_MATCH = None - COLLECTION_XMLS = {} - - -class GetMetadataTask(backgroundthread.Task): +class GetMetadataThread(common.LibrarySyncMixin, + backgroundthread.KillableThread): """ Threaded download of Plex XML metadata for a certain library item. Fills the queue with the downloaded etree XML objects - - Input: - queue Queue.Queue() object where this thread will store - the downloaded metadata XMLs as etree objects """ - def __init__(self, queue, plex_id, plex_type, get_children=False, - count=None): - self.queue = queue - self.plex_id = plex_id - self.plex_type = plex_type - self.get_children = get_children - self.count = count - super(GetMetadataTask, self).__init__() - - def suspend(self, block=False, timeout=None): - """ - Let's NOT suspend sync threads but immediately terminate them - """ - self.cancel() + def __init__(self, get_metadata_queue, processing_queue): + self.get_metadata_queue = get_metadata_queue + self.processing_queue = processing_queue + super(GetMetadataThread, self).__init__() def _collections(self, item): - global COLLECTION_MATCH, COLLECTION_XMLS api = API(item['xml'][0]) - if COLLECTION_MATCH is None: - COLLECTION_MATCH = PF.collections(api.library_section_id()) - if COLLECTION_MATCH is None: + collection_match = item['section'].collection_match + collection_xmls = item['section'].collection_xmls + if collection_match is None: + collection_match = PF.collections(api.library_section_id()) + if collection_match is None: LOG.error('Could not download collections') return # Extract what we need to know - COLLECTION_MATCH = \ + collection_match = \ [(utils.cast(int, x.get('index')), - utils.cast(int, x.get('ratingKey'))) for x in COLLECTION_MATCH] + utils.cast(int, x.get('ratingKey'))) for x in collection_match] item['children'] = {} for plex_set_id, set_name in api.collections(): if self.should_cancel(): return - if plex_set_id not in COLLECTION_XMLS: + if plex_set_id not in collection_xmls: # Get Plex metadata for collections - a pain - for index, collection_plex_id in COLLECTION_MATCH: + for index, collection_plex_id in collection_match: if index == plex_set_id: collection_xml = PF.GetPlexMetadata(collection_plex_id) try: @@ -77,54 +50,84 @@ class GetMetadataTask(backgroundthread.Task): LOG.error('Could not get collection %s %s', collection_plex_id, set_name) continue - COLLECTION_XMLS[plex_set_id] = collection_xml + collection_xmls[plex_set_id] = collection_xml break else: LOG.error('Did not find Plex collection %s %s', plex_set_id, set_name) continue - item['children'][plex_set_id] = COLLECTION_XMLS[plex_set_id] + item['children'][plex_set_id] = collection_xmls[plex_set_id] + + def _process_abort(self, count, section): + # Make sure other threads will also receive sentinel + self.get_metadata_queue.put(None) + if count is not None: + self._process_skipped_item(count, section) + + def _process_skipped_item(self, count, section): + section.sync_successful = False + # Add a "dummy" item so we're not skipping a beat + self.processing_queue.put((count, {'section': section, 'xml': None})) def run(self): - """ - Do the work - """ - if self.should_cancel(): - return - # Download Metadata - item = { - 'xml': PF.GetPlexMetadata(self.plex_id), - 'children': None - } - if item['xml'] is None: - # Did not receive a valid XML - skip that item for now - LOG.error("Could not get metadata for %s. Skipping that item " - "for now", self.plex_id) - return - elif item['xml'] == 401: - LOG.error('HTTP 401 returned by PMS. Too much strain? ' - 'Cancelling sync for now') - utils.window('plex_scancrashed', value='401') - return - if not self.should_cancel() and self.plex_type == v.PLEX_TYPE_MOVIE: - # Check for collections/sets - collections = False - for child in item['xml'][0]: - if child.tag == 'Collection': - collections = True - break - if collections: - global LOCK - with LOCK: - self._collections(item) - if not self.should_cancel() and self.get_children: - children_xml = PF.GetAllPlexChildren(self.plex_id) + LOG.debug('Starting %s thread', self.__class__.__name__) + app.APP.register_thread(self) + try: + self._run() + finally: + app.APP.deregister_thread(self) + LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__) + + def _run(self): + while True: + item = self.get_metadata_queue.get() try: - children_xml[0].attrib - except (TypeError, IndexError, AttributeError): - LOG.error('Could not get children for Plex id %s', - self.plex_id) - else: - item['children'] = children_xml - if not self.should_cancel(): - self.queue.put((self.count, item)) + if item is None or self.should_cancel(): + self._process_abort(item[0] if item else None, + item[2] if item else None) + break + count, plex_id, section = item + item = { + 'xml': PF.GetPlexMetadata(plex_id), # This will block + 'children': None, + 'section': section + } + if item['xml'] is None: + # Did not receive a valid XML - skip that item for now + LOG.error("Could not get metadata for %s. Skipping item " + "for now", plex_id) + self._process_skipped_item(count, section) + continue + elif item['xml'] == 401: + LOG.error('HTTP 401 returned by PMS. Too much strain? ' + 'Cancelling sync for now') + utils.window('plex_scancrashed', value='401') + self._process_abort(count, section) + break + if section.plex_type == v.PLEX_TYPE_MOVIE: + # Check for collections/sets + collections = False + for child in item['xml'][0]: + if child.tag == 'Collection': + collections = True + break + if collections: + with LOCK: + self._collections(item) + if section.get_children: + if self.should_cancel(): + self._process_abort(count, section) + break + children_xml = PF.GetAllPlexChildren(plex_id) # Will block + try: + children_xml[0].attrib + except (TypeError, IndexError, AttributeError): + LOG.error('Could not get children for Plex id %s', + plex_id) + self._process_skipped_item(count, section) + continue + else: + item['children'] = children_xml + self.processing_queue.put((count, item)) + finally: + self.get_metadata_queue.task_done() diff --git a/resources/lib/library_sync/process_metadata.py b/resources/lib/library_sync/process_metadata.py new file mode 100644 index 00000000..abc70fdd --- /dev/null +++ b/resources/lib/library_sync/process_metadata.py @@ -0,0 +1,104 @@ +# -*- coding: utf-8 -*- +from __future__ import absolute_import, division, unicode_literals +from logging import getLogger + +from . import common, sections +from ..plex_db import PlexDB +from .. import backgroundthread, app + +LOG = getLogger('PLEX.sync.process_metadata') + +COMMIT_TO_DB_EVERY_X_ITEMS = 500 + + +class ProcessMetadataThread(common.LibrarySyncMixin, + backgroundthread.KillableThread): + """ + Invoke once in order to process the received PMS metadata xmls + """ + def __init__(self, current_time, processing_queue, update_progressbar): + self.current_time = current_time + self.processing_queue = processing_queue + self.update_progressbar = update_progressbar + self.last_section = sections.Section() + self.successful = True + super(ProcessMetadataThread, self).__init__() + + def start_section(self, section): + if section != self.last_section: + if self.last_section: + self.finish_last_section() + LOG.debug('Start or continue processing section %s', section) + self.last_section = section + # Warn the user for this new section if we cannot access a file + app.SYNC.path_verified = False + else: + LOG.debug('Resume processing section %s', section) + + def finish_last_section(self): + if (not self.should_cancel() and self.last_section and + self.last_section.sync_successful): + # Check for should_cancel() because we cannot be sure that we + # processed every item of the section + with PlexDB() as plexdb: + # Set the new time mark for the next delta sync + plexdb.update_section_last_sync(self.last_section.section_id, + self.current_time) + LOG.info('Finished processing section successfully: %s', + self.last_section) + elif self.last_section and not self.last_section.sync_successful: + LOG.warn('Sync not successful for section %s', self.last_section) + self.successful = False + + def _get(self): + item = {'xml': None} + while not self.should_cancel() and item and item['xml'] is None: + item = self.processing_queue.get() + self.processing_queue.task_done() + return item + + def run(self): + LOG.debug('Starting %s thread', self.__class__.__name__) + app.APP.register_thread(self) + try: + self._run() + except Exception: + from .. import utils + utils.ERROR(notify=True) + finally: + app.APP.deregister_thread(self) + LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__) + + def _run(self): + # There are 2 sentinels: None for aborting/ending this thread, the dict + # {'section': section, 'xml': None} for skipped/invalid items + item = self._get() + if item: + section = item['section'] + processed = 0 + self.start_section(section) + while not self.should_cancel(): + if item is None: + break + elif item['section'] != section: + # We received an entirely new section + self.start_section(item['section']) + section = item['section'] + with section.context(self.current_time) as context: + while not self.should_cancel(): + if item is None or item['section'] != section: + break + self.update_progressbar(section, + item['xml'][0].get('title'), + section.count) + context.add_update(item['xml'][0], + section_name=section.name, + section_id=section.section_id, + children=item['children']) + processed += 1 + section.count += 1 + if processed == COMMIT_TO_DB_EVERY_X_ITEMS: + processed = 0 + context.commit() + item = self._get() + self.finish_last_section() diff --git a/resources/lib/library_sync/sections.py b/resources/lib/library_sync/sections.py index 451673b3..da97e455 100644 --- a/resources/lib/library_sync/sections.py +++ b/resources/lib/library_sync/sections.py @@ -54,6 +54,8 @@ class Section(object): self.content = None # unicode # Setting the section_type WILL re_set sync_to_kodi! self._section_type = None # unicode + # E.g. "season" or "movie" (translated) + self.section_type_text = None # Do we sync all items of this section to the Kodi DB? # This will be set with section_type!! self.sync_to_kodi = None # bool @@ -77,13 +79,9 @@ class Section(object): self.order = None # Original PMS xml for this section, including children self.xml = None - # Attributes that will be initialized later by full_sync.py - self.iterator = None - self.context = None - self.get_children = None # A section_type encompasses possible several plex_types! E.g. shows # contain shows, seasons, episodes - self.plex_type = None + self._plex_type = None if xml_element is not None: self.from_xml(xml_element) elif section_db_element: @@ -106,9 +104,14 @@ class Section(object): self.section_type is not None) def __eq__(self, section): + """ + Sections compare equal if their section_id, name and plex_type (first + prio) OR section_type (if there is no plex_type is set) compare equal + """ return (self.section_id == section.section_id and self.name == section.name and - self.section_type == section.section_type) + (self.plex_type == section.plex_type if self.plex_type else + self.section_type == section.section_type)) def __ne__(self, section): return not self == section @@ -140,6 +143,15 @@ class Section(object): else: self.sync_to_kodi = True + @property + def plex_type(self): + return self._plex_type + + @plex_type.setter + def plex_type(self, value): + self._plex_type = value + self.section_type_text = utils.lang(v.TRANSLATION_FROM_PLEXTYPE[value]) + @property def index(self): return self._index @@ -431,6 +443,39 @@ class Section(object): self.remove_from_plex() +def _get_children(plex_type): + if plex_type == v.PLEX_TYPE_ALBUM: + return True + else: + return False + + +def get_sync_section(section, plex_type): + """ + Deep-copies section and adds certain arguments in order to prep section + for the library sync + """ + section = copy.deepcopy(section) + section.plex_type = plex_type + section.context = itemtypes.ITEMTYPE_FROM_PLEXTYPE[plex_type] + section.get_children = _get_children(plex_type) + # Some more init stuff + # Has sync for this section been successful? + section.sync_successful = True + # List of tuples: (collection index [as in an item's metadata with + # "Collection id"], collection plex id) + section.collection_match = None + # Dict with entries of the form : + section.collection_xmls = {} + # Keep count during sync + section.count = 0 + # Total number of items that we need to sync + section.number_of_items = 0 + # Iterator to get one sync item after the other + section.iterator = None + return section + + def force_full_sync(): """ Resets the sync timestamp for all sections to 0, thus forcing a subsequent