Rewire library sync to speed it up and fix sync getting stuck in rare cases
This commit is contained in:
parent
f4ea051c81
commit
8f86f43a93
7 changed files with 693 additions and 457 deletions
|
@ -2,12 +2,15 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
from time import time as _time
|
||||
import threading
|
||||
import Queue
|
||||
import heapq
|
||||
from collections import deque
|
||||
|
||||
import xbmc
|
||||
|
||||
from . import utils, app
|
||||
from . import utils, app, variables as v
|
||||
|
||||
LOG = getLogger('PLEX.threads')
|
||||
|
||||
|
@ -36,8 +39,8 @@ class KillableThread(threading.Thread):
|
|||
"""
|
||||
self._canceled = True
|
||||
# Make sure thread is running in order to exit quickly
|
||||
self._is_not_suspended.set()
|
||||
self._is_not_asleep.set()
|
||||
self._is_not_suspended.set()
|
||||
|
||||
def should_suspend(self):
|
||||
"""
|
||||
|
@ -66,8 +69,8 @@ class KillableThread(threading.Thread):
|
|||
back to life
|
||||
"""
|
||||
self._suspended = False
|
||||
self._is_not_suspended.set()
|
||||
self._is_not_asleep.set()
|
||||
self._is_not_suspended.set()
|
||||
|
||||
def wait_while_suspended(self):
|
||||
"""
|
||||
|
@ -104,6 +107,166 @@ class KillableThread(threading.Thread):
|
|||
return not self._is_not_asleep.is_set()
|
||||
|
||||
|
||||
class ProcessingQueue(Queue.Queue, object):
|
||||
"""
|
||||
Queue of queues that processes a queue completely before moving on to the
|
||||
next queue. There's one queue per Section(). You need to initialize each
|
||||
section with add_section(section) first.
|
||||
Put tuples (count, item) into this queue, with count being the respective
|
||||
position of the item in the queue, starting with 0 (zero).
|
||||
(None, None) is the sentinel for a single queue being exhausted, added by
|
||||
put_sentinel()
|
||||
"""
|
||||
def _init(self, maxsize):
|
||||
self.queue = deque()
|
||||
self._sections = deque()
|
||||
self._queues = deque()
|
||||
self._current_section = None
|
||||
self._current_queue = None
|
||||
self._counter = 0
|
||||
|
||||
def _qsize(self):
|
||||
return self._current_queue._qsize() if self._current_queue else 0
|
||||
|
||||
def total_size(self):
|
||||
"""
|
||||
Return the approximate total size of all queues (not reliable!)
|
||||
"""
|
||||
self.mutex.acquire()
|
||||
n = sum(q._qsize() for q in self._queues) if self._queues else 0
|
||||
self.mutex.release()
|
||||
return n
|
||||
|
||||
def put(self, item, block=True, timeout=None):
|
||||
"""Put an item into the queue.
|
||||
|
||||
If optional args 'block' is true and 'timeout' is None (the default),
|
||||
block if necessary until a free slot is available. If 'timeout' is
|
||||
a non-negative number, it blocks at most 'timeout' seconds and raises
|
||||
the Full exception if no free slot was available within that time.
|
||||
Otherwise ('block' is false), put an item on the queue if a free slot
|
||||
is immediately available, else raise the Full exception ('timeout'
|
||||
is ignored in that case).
|
||||
"""
|
||||
self.not_full.acquire()
|
||||
try:
|
||||
if self.maxsize > 0:
|
||||
if not block:
|
||||
# Use >= instead of == due to OrderedQueue!
|
||||
if self._qsize() >= self.maxsize:
|
||||
raise Queue.Full
|
||||
elif timeout is None:
|
||||
while self._qsize() >= self.maxsize:
|
||||
self.not_full.wait()
|
||||
elif timeout < 0:
|
||||
raise ValueError("'timeout' must be a non-negative number")
|
||||
else:
|
||||
endtime = _time() + timeout
|
||||
while self._qsize() >= self.maxsize:
|
||||
remaining = endtime - _time()
|
||||
if remaining <= 0.0:
|
||||
raise Queue.Full
|
||||
self.not_full.wait(remaining)
|
||||
if self._put(item) == 0:
|
||||
# Only notify one waiting thread if this item is put into the
|
||||
# current queue
|
||||
self.not_empty.notify()
|
||||
else:
|
||||
# Be sure to signal not_empty only once!
|
||||
self._unlock_after_section_change()
|
||||
self.unfinished_tasks += 1
|
||||
finally:
|
||||
self.not_full.release()
|
||||
|
||||
def _put(self, item):
|
||||
"""
|
||||
Returns the index of the section in whose subqueue we need to put the
|
||||
item into
|
||||
"""
|
||||
for i, section in enumerate(self._sections):
|
||||
if item[1]['section'] == section:
|
||||
self._queues[i]._put(item)
|
||||
break
|
||||
else:
|
||||
raise RuntimeError('Could not find section for item %s' % item[1])
|
||||
return i
|
||||
|
||||
def _unlock_after_section_change(self):
|
||||
"""
|
||||
Ugly work-around if we expected more items to be synced, but we had
|
||||
to lower our section.number_of_items because PKC decided that nothing
|
||||
changed and we don't need to sync the respective item(s).
|
||||
get() thus might block indefinitely
|
||||
"""
|
||||
while (self._current_section and
|
||||
self._counter == self._current_section.number_of_items):
|
||||
LOG.debug('Signaling completion of current section')
|
||||
self._init_next_section()
|
||||
if self._current_queue and self._current_queue._qsize():
|
||||
LOG.debug('Signaling not_empty')
|
||||
self.not_empty.notify()
|
||||
|
||||
def put_sentinel(self, section):
|
||||
"""
|
||||
Adds a new empty section as a sentinel. Call with an empty Section()
|
||||
object.
|
||||
Once the get()-method returns None, you've received the sentinel and
|
||||
you've thus exhausted the queue
|
||||
"""
|
||||
self.not_empty.acquire()
|
||||
try:
|
||||
section.number_of_items = 1
|
||||
self._add_section(section)
|
||||
# Add the actual sentinel to the queue we just added
|
||||
self._queues[-1]._put((None, None))
|
||||
self.unfinished_tasks += 1
|
||||
if len(self._queues) == 1:
|
||||
# queue was already exhausted!
|
||||
self._switch_queues()
|
||||
self._counter = 0
|
||||
self.not_empty.notify()
|
||||
else:
|
||||
self._unlock_after_section_change()
|
||||
finally:
|
||||
self.not_empty.release()
|
||||
|
||||
def add_section(self, section):
|
||||
"""
|
||||
Be sure to add all sections first before starting to pop items off this
|
||||
queue or adding them to the queue
|
||||
"""
|
||||
self.mutex.acquire()
|
||||
try:
|
||||
self._add_section(section)
|
||||
finally:
|
||||
self.mutex.release()
|
||||
|
||||
def _add_section(self, section):
|
||||
self._sections.append(section)
|
||||
self._queues.append(
|
||||
OrderedQueue() if section.plex_type == v.PLEX_TYPE_ALBUM
|
||||
else Queue.Queue())
|
||||
if self._current_section is None:
|
||||
self._switch_queues()
|
||||
|
||||
def _init_next_section(self):
|
||||
self._sections.popleft()
|
||||
self._queues.popleft()
|
||||
self._counter = 0
|
||||
self._switch_queues()
|
||||
|
||||
def _switch_queues(self):
|
||||
self._current_section = self._sections[0] if self._sections else None
|
||||
self._current_queue = self._queues[0] if self._queues else None
|
||||
|
||||
def _get(self):
|
||||
item = self._current_queue._get()
|
||||
self._counter += 1
|
||||
if self._counter == self._current_section.number_of_items:
|
||||
self._init_next_section()
|
||||
return item[1]
|
||||
|
||||
|
||||
class OrderedQueue(Queue.PriorityQueue, object):
|
||||
"""
|
||||
Queue that enforces an order on the items it returns. An item you push
|
||||
|
@ -111,58 +274,21 @@ class OrderedQueue(Queue.PriorityQueue, object):
|
|||
(index, item)
|
||||
where index=-1 is the item that will be returned first. The Queue will block
|
||||
until index=-1, 0, 1, 2, 3, ... is then made available
|
||||
|
||||
maxsize will be rather fuzzy, as _qsize returns 0 if we're still waiting
|
||||
for the next smalles index. put() thus might not block always when it
|
||||
should.
|
||||
"""
|
||||
def __init__(self, maxsize=0):
|
||||
self.next_index = 0
|
||||
super(OrderedQueue, self).__init__(maxsize)
|
||||
self.smallest = -1
|
||||
self.not_next_item = threading.Condition(self.mutex)
|
||||
|
||||
def _put(self, item, heappush=heapq.heappush):
|
||||
heappush(self.queue, item)
|
||||
if item[0] == self.smallest:
|
||||
self.not_next_item.notify()
|
||||
def _qsize(self, len=len):
|
||||
return len(self.queue) if self.queue[0][0] == self.next_index else 0
|
||||
|
||||
def get(self, block=True, timeout=None):
|
||||
"""Remove and return an item from the queue.
|
||||
|
||||
If optional args 'block' is true and 'timeout' is None (the default),
|
||||
block if necessary until an item is available. If 'timeout' is
|
||||
a non-negative number, it blocks at most 'timeout' seconds and raises
|
||||
the Empty exception if no item was available within that time.
|
||||
Otherwise ('block' is false), return an item if one is immediately
|
||||
available, else raise the Empty exception ('timeout' is ignored
|
||||
in that case).
|
||||
"""
|
||||
self.not_empty.acquire()
|
||||
try:
|
||||
if not block:
|
||||
if not self._qsize() or self.queue[0][0] != self.smallest:
|
||||
raise Queue.Empty
|
||||
elif timeout is None:
|
||||
while not self._qsize():
|
||||
self.not_empty.wait()
|
||||
while self.queue[0][0] != self.smallest:
|
||||
self.not_next_item.wait()
|
||||
elif timeout < 0:
|
||||
raise ValueError("'timeout' must be a non-negative number")
|
||||
else:
|
||||
endtime = Queue._time() + timeout
|
||||
while not self._qsize():
|
||||
remaining = endtime - Queue._time()
|
||||
if remaining <= 0.0:
|
||||
raise Queue.Empty
|
||||
self.not_empty.wait(remaining)
|
||||
while self.queue[0][0] != self.smallest:
|
||||
remaining = endtime - Queue._time()
|
||||
if remaining <= 0.0:
|
||||
raise Queue.Empty
|
||||
self.not_next_item.wait(remaining)
|
||||
item = self._get()
|
||||
self.smallest += 1
|
||||
self.not_full.notify()
|
||||
return item
|
||||
finally:
|
||||
self.not_empty.release()
|
||||
def _get(self, heappop=heapq.heappop):
|
||||
self.next_index += 1
|
||||
return heappop(self.queue)
|
||||
|
||||
|
||||
class Tasks(list):
|
||||
|
|
|
@ -9,6 +9,20 @@ PLAYLIST_SYNC_ENABLED = (v.DEVICE != 'Microsoft UWP' and
|
|||
utils.settings('enablePlaylistSync') == 'true')
|
||||
|
||||
|
||||
class LibrarySyncMixin(object):
|
||||
def suspend(self, block=False, timeout=None):
|
||||
"""
|
||||
Let's NOT suspend sync threads but immediately terminate them
|
||||
"""
|
||||
self.cancel()
|
||||
|
||||
def wait_while_suspended(self):
|
||||
"""
|
||||
Return immediately
|
||||
"""
|
||||
return self.should_cancel()
|
||||
|
||||
|
||||
def update_kodi_library(video=True, music=True):
|
||||
"""
|
||||
Updates the Kodi library and thus refreshes the Kodi views and widgets
|
||||
|
|
118
resources/lib/library_sync/fill_metadata_queue.py
Normal file
118
resources/lib/library_sync/fill_metadata_queue.py
Normal file
|
@ -0,0 +1,118 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
import Queue
|
||||
from collections import deque
|
||||
|
||||
from . import common
|
||||
from ..plex_db import PlexDB
|
||||
from .. import backgroundthread, app
|
||||
|
||||
LOG = getLogger('PLEX.sync.fill_metadata_queue')
|
||||
|
||||
|
||||
def batch_sizes():
|
||||
"""
|
||||
Increase batch sizes in order to get download threads for an items xml
|
||||
metadata started soon. Corresponds to batch sizes when downloading lists
|
||||
of items from the PMS ('limitindex' in the PKC settings)
|
||||
"""
|
||||
for i in (50, 100, 200, 400):
|
||||
yield i
|
||||
while True:
|
||||
yield 1000
|
||||
|
||||
|
||||
class FillMetadataQueue(common.LibrarySyncMixin,
|
||||
backgroundthread.KillableThread, ):
|
||||
"""
|
||||
Threaded download of Plex XML metadata for a certain library item.
|
||||
Fills the queue with the downloaded etree XML objects
|
||||
|
||||
Input:
|
||||
queue Queue.Queue() object where this thread will store
|
||||
the downloaded metadata XMLs as etree objects
|
||||
"""
|
||||
def __init__(self, repair, section_queue, get_metadata_queue):
|
||||
self.repair = repair
|
||||
self.section_queue = section_queue
|
||||
self.get_metadata_queue = get_metadata_queue
|
||||
self.count = 0
|
||||
self.batch_size = batch_sizes()
|
||||
super(FillMetadataQueue, self).__init__()
|
||||
|
||||
def _loop(self, section, items):
|
||||
while items and not self.should_cancel():
|
||||
try:
|
||||
with PlexDB(lock=False) as plexdb:
|
||||
while items and not self.should_cancel():
|
||||
last, plex_id, checksum = items.popleft()
|
||||
if (not self.repair and
|
||||
plexdb.checksum(plex_id, section.plex_type) == checksum):
|
||||
continue
|
||||
if last:
|
||||
# We might have received LESS items from the PMS
|
||||
# than anticipated. Ensures that our queues finish
|
||||
section.number_of_items = self.count + 1
|
||||
self.get_metadata_queue.put((self.count, plex_id, section),
|
||||
block=False)
|
||||
self.count += 1
|
||||
except Queue.Full:
|
||||
# Close the DB for speed!
|
||||
LOG.debug('Queue full')
|
||||
self.sleep(5)
|
||||
while not self.should_cancel():
|
||||
try:
|
||||
self.get_metadata_queue.put((self.count, plex_id, section),
|
||||
block=False)
|
||||
except Queue.Full:
|
||||
LOG.debug('Queue fuller')
|
||||
self.sleep(2)
|
||||
else:
|
||||
self.count += 1
|
||||
break
|
||||
|
||||
def _process_section(self, section):
|
||||
# Initialize only once to avoid loosing the last value before we're
|
||||
# breaking the for loop
|
||||
iterator = common.tag_last(section.iterator)
|
||||
last = True
|
||||
self.count = 0
|
||||
while not self.should_cancel():
|
||||
batch_size = next(self.batch_size)
|
||||
LOG.debug('Process batch of size %s with count %s for section %s',
|
||||
batch_size, self.count, section)
|
||||
# Iterator will block for download - let's not do that when the
|
||||
# DB connection is open
|
||||
items = deque()
|
||||
for i, (last, xml) in enumerate(iterator):
|
||||
plex_id = int(xml.get('ratingKey'))
|
||||
checksum = int('{}{}'.format(
|
||||
plex_id,
|
||||
xml.get('updatedAt',
|
||||
xml.get('addedAt', '1541572987'))))
|
||||
items.append((last, plex_id, checksum))
|
||||
if i == batch_size:
|
||||
break
|
||||
self._loop(section, items)
|
||||
if last:
|
||||
break
|
||||
|
||||
def run(self):
|
||||
LOG.debug('Starting %s thread', self.__class__.__name__)
|
||||
app.APP.register_thread(self)
|
||||
try:
|
||||
while not self.should_cancel():
|
||||
section = self.section_queue.get()
|
||||
self.section_queue.task_done()
|
||||
if section is None:
|
||||
break
|
||||
self._process_section(section)
|
||||
except Exception:
|
||||
from .. import utils
|
||||
utils.ERROR(notify=True)
|
||||
finally:
|
||||
# Signal the download metadata threads to stop with a sentinel
|
||||
self.get_metadata_queue.put(None)
|
||||
app.APP.deregister_thread(self)
|
||||
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
|
|
@ -3,15 +3,15 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
import Queue
|
||||
import copy
|
||||
|
||||
import xbmcgui
|
||||
|
||||
from .get_metadata import GetMetadataTask, reset_collections
|
||||
from .get_metadata import GetMetadataThread
|
||||
from .fill_metadata_queue import FillMetadataQueue
|
||||
from .process_metadata import ProcessMetadataThread
|
||||
from . import common, sections
|
||||
from .. import utils, timing, backgroundthread, variables as v, app
|
||||
from .. import plex_functions as PF, itemtypes
|
||||
from ..plex_db import PlexDB
|
||||
|
||||
if common.PLAYLIST_SYNC_ENABLED:
|
||||
from .. import playlists
|
||||
|
@ -19,222 +19,107 @@ if common.PLAYLIST_SYNC_ENABLED:
|
|||
|
||||
LOG = getLogger('PLEX.sync.full_sync')
|
||||
# How many items will be put through the processing chain at once?
|
||||
BATCH_SIZE = 2000
|
||||
BATCH_SIZE = 250
|
||||
# Size of queue for xmls to be downloaded from PMS for/and before processing
|
||||
QUEUE_BUFFER = 50
|
||||
# Max number of xmls held in memory
|
||||
MAX_QUEUE_SIZE = 500
|
||||
# Safety margin to filter PMS items - how many seconds to look into the past?
|
||||
UPDATED_AT_SAFETY = 60 * 5
|
||||
LAST_VIEWED_AT_SAFETY = 60 * 5
|
||||
|
||||
|
||||
class InitNewSection(object):
|
||||
"""
|
||||
Throw this into the queue used for ProcessMetadata to tell it which
|
||||
Plex library section we're looking at
|
||||
"""
|
||||
def __init__(self, context, total_number_of_items, section_name,
|
||||
section_id, plex_type):
|
||||
self.context = context
|
||||
self.total = total_number_of_items
|
||||
self.name = section_name
|
||||
self.id = section_id
|
||||
self.plex_type = plex_type
|
||||
|
||||
|
||||
class FullSync(backgroundthread.KillableThread):
|
||||
class FullSync(common.LibrarySyncMixin, backgroundthread.KillableThread):
|
||||
def __init__(self, repair, callback, show_dialog):
|
||||
"""
|
||||
repair=True: force sync EVERY item
|
||||
"""
|
||||
self.repair = repair
|
||||
self.callback = callback
|
||||
self.queue = None
|
||||
self.process_thread = None
|
||||
self.current_sync = None
|
||||
self.plexdb = None
|
||||
self.plex_type = None
|
||||
self.section_type = None
|
||||
self.worker_count = int(utils.settings('syncThreadNumber'))
|
||||
self.item_count = 0
|
||||
# For progress dialog
|
||||
self.show_dialog = show_dialog
|
||||
self.show_dialog_userdata = utils.settings('playstate_sync_indicator') == 'true'
|
||||
self.dialog = None
|
||||
self.total = 0
|
||||
self.current = 0
|
||||
self.processed = 0
|
||||
self.title = ''
|
||||
self.section = None
|
||||
self.section_name = None
|
||||
self.section_type_text = None
|
||||
self.context = None
|
||||
self.get_children = None
|
||||
self.successful = None
|
||||
self.section_success = None
|
||||
if self.show_dialog:
|
||||
self.dialog = xbmcgui.DialogProgressBG()
|
||||
self.dialog.create(utils.lang(39714))
|
||||
else:
|
||||
self.dialog = None
|
||||
|
||||
self.section_queue = Queue.Queue()
|
||||
self.get_metadata_queue = Queue.Queue(maxsize=5000)
|
||||
self.processing_queue = backgroundthread.ProcessingQueue(maxsize=500)
|
||||
self.current_time = timing.plex_now()
|
||||
self.last_section = sections.Section()
|
||||
|
||||
self.successful = True
|
||||
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
||||
self.threader = backgroundthread.ThreaderManager(
|
||||
worker=backgroundthread.NonstoppingBackgroundWorker,
|
||||
worker_count=self.worker_count)
|
||||
self.threads = [
|
||||
GetMetadataThread(self.get_metadata_queue, self.processing_queue)
|
||||
for _ in range(int(utils.settings('syncThreadNumber')))
|
||||
]
|
||||
for t in self.threads:
|
||||
t.start()
|
||||
super(FullSync, self).__init__()
|
||||
|
||||
def suspend(self, block=False, timeout=None):
|
||||
"""
|
||||
Let's NOT suspend sync threads but immediately terminate them
|
||||
"""
|
||||
self.cancel()
|
||||
|
||||
def update_progressbar(self):
|
||||
if self.dialog:
|
||||
try:
|
||||
progress = int(float(self.current) / float(self.total) * 100.0)
|
||||
except ZeroDivisionError:
|
||||
progress = 0
|
||||
self.dialog.update(progress,
|
||||
'%s (%s)' % (self.section_name, self.section_type_text),
|
||||
'%s %s/%s'
|
||||
% (self.title, self.current, self.total))
|
||||
if app.APP.is_playing_video:
|
||||
self.dialog.close()
|
||||
self.dialog = None
|
||||
|
||||
def process_item(self, xml_item):
|
||||
"""
|
||||
Processes a single library item
|
||||
"""
|
||||
plex_id = int(xml_item.get('ratingKey'))
|
||||
if not self.repair and self.plexdb.checksum(plex_id, self.plex_type) == \
|
||||
int('%s%s' % (plex_id,
|
||||
xml_item.get('updatedAt',
|
||||
xml_item.get('addedAt', 1541572987)))):
|
||||
def update_progressbar(self, section, title, current):
|
||||
if not self.dialog:
|
||||
return
|
||||
self.threader.addTask(GetMetadataTask(self.queue,
|
||||
plex_id,
|
||||
self.plex_type,
|
||||
self.get_children,
|
||||
self.item_count))
|
||||
self.item_count += 1
|
||||
|
||||
def update_library(self):
|
||||
LOG.debug('Writing changes to Kodi library now')
|
||||
i = 0
|
||||
if not self.section:
|
||||
_, self.section = self.queue.get()
|
||||
self.queue.task_done()
|
||||
while not self.should_cancel() and self.item_count > 0:
|
||||
section = self.section
|
||||
if not section:
|
||||
break
|
||||
LOG.debug('Start or continue processing section %s (%ss)',
|
||||
section.name, section.plex_type)
|
||||
self.processed = 0
|
||||
self.total = section.total
|
||||
self.section_name = section.name
|
||||
self.section_type_text = utils.lang(
|
||||
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
|
||||
with section.context(self.current_sync) as context:
|
||||
while not self.should_cancel() and self.item_count > 0:
|
||||
try:
|
||||
_, item = self.queue.get(block=False)
|
||||
except backgroundthread.Queue.Empty:
|
||||
if self.threader.threader.working():
|
||||
self.sleep(0.02)
|
||||
continue
|
||||
else:
|
||||
# Try again, in case a thread just finished
|
||||
i += 1
|
||||
if i == 3:
|
||||
break
|
||||
continue
|
||||
i = 0
|
||||
self.queue.task_done()
|
||||
if isinstance(item, dict):
|
||||
context.add_update(item['xml'][0],
|
||||
section_name=section.name,
|
||||
section_id=section.id,
|
||||
children=item['children'])
|
||||
self.title = item['xml'][0].get('title')
|
||||
self.processed += 1
|
||||
elif isinstance(item, InitNewSection) or item is None:
|
||||
self.section = item
|
||||
break
|
||||
else:
|
||||
raise ValueError('Unknown type %s' % type(item))
|
||||
self.item_count -= 1
|
||||
self.current += 1
|
||||
self.update_progressbar()
|
||||
if self.processed == 500:
|
||||
self.processed = 0
|
||||
context.commit()
|
||||
LOG.debug('Done writing changes to Kodi library')
|
||||
|
||||
@utils.log_time
|
||||
def addupdate_section(self, section):
|
||||
LOG.debug('Processing library section for new or changed items %s',
|
||||
section)
|
||||
if not self.install_sync_done:
|
||||
app.SYNC.path_verified = False
|
||||
current += 1
|
||||
try:
|
||||
# Sync new, updated and deleted items
|
||||
iterator = section.iterator
|
||||
# Tell the processing thread about this new section
|
||||
queue_info = InitNewSection(section.context,
|
||||
iterator.total,
|
||||
iterator.get('librarySectionTitle',
|
||||
iterator.get('title1')),
|
||||
section.section_id,
|
||||
section.plex_type)
|
||||
self.queue.put((-1, queue_info))
|
||||
last = True
|
||||
# To keep track of the item-number in order to kill while loops
|
||||
self.item_count = 0
|
||||
self.current = 0
|
||||
# Initialize only once to avoid loosing the last value before
|
||||
# we're breaking the for loop
|
||||
loop = common.tag_last(iterator)
|
||||
while True:
|
||||
# Check Plex DB to see what we need to add/update
|
||||
with PlexDB() as self.plexdb:
|
||||
for last, xml_item in loop:
|
||||
if self.should_cancel():
|
||||
return False
|
||||
self.process_item(xml_item)
|
||||
if self.item_count == BATCH_SIZE:
|
||||
break
|
||||
# Make sure Plex DB above is closed before adding/updating!
|
||||
self.update_library()
|
||||
if last:
|
||||
break
|
||||
reset_collections()
|
||||
return True
|
||||
except RuntimeError:
|
||||
LOG.error('Could not entirely process section %s', section)
|
||||
return False
|
||||
progress = int(float(current) / float(section.number_of_items) * 100.0)
|
||||
except ZeroDivisionError:
|
||||
progress = 0
|
||||
self.dialog.update(progress,
|
||||
'%s (%s)' % (section.name, section.section_type_text),
|
||||
'%s %s/%s'
|
||||
% (title, current, section.number_of_items))
|
||||
if app.APP.is_playing_video:
|
||||
self.dialog.close()
|
||||
self.dialog = None
|
||||
|
||||
@utils.log_time
|
||||
def processing_loop_new_and_changed_items(self):
|
||||
LOG.debug('Start working')
|
||||
scanner_thread = FillMetadataQueue(self.repair,
|
||||
self.section_queue,
|
||||
self.get_metadata_queue)
|
||||
scanner_thread.start()
|
||||
process_thread = ProcessMetadataThread(self.current_time,
|
||||
self.processing_queue,
|
||||
self.update_progressbar)
|
||||
process_thread.start()
|
||||
LOG.debug('Waiting for scanner thread to finish up')
|
||||
scanner_thread.join()
|
||||
LOG.debug('Waiting for metadata download threads to finish up')
|
||||
for t in self.threads:
|
||||
t.join()
|
||||
LOG.debug('Download metadata threads finished')
|
||||
# Sentinel for the process_thread once we added everything else
|
||||
self.processing_queue.put_sentinel(sections.Section())
|
||||
process_thread.join()
|
||||
self.successful = process_thread.successful
|
||||
LOG.debug('threads finished work. successful: %s', self.successful)
|
||||
|
||||
@utils.log_time
|
||||
def processing_loop_playstates(self):
|
||||
while not self.should_cancel():
|
||||
section = self.section_queue.get()
|
||||
self.section_queue.task_done()
|
||||
if section is None:
|
||||
break
|
||||
self.playstate_per_section(section)
|
||||
|
||||
def playstate_per_section(self, section):
|
||||
LOG.debug('Processing %s playstates for library section %s',
|
||||
section.iterator.total, section)
|
||||
section.number_of_items, section)
|
||||
try:
|
||||
# Sync new, updated and deleted items
|
||||
iterator = section.iterator
|
||||
# Tell the processing thread about this new section
|
||||
queue_info = InitNewSection(section.context,
|
||||
iterator.total,
|
||||
section.name,
|
||||
section.section_id,
|
||||
section.plex_type)
|
||||
self.queue.put((-1, queue_info))
|
||||
self.total = iterator.total
|
||||
self.section_name = section.name
|
||||
self.section_type_text = utils.lang(
|
||||
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
|
||||
self.current = 0
|
||||
|
||||
iterator = common.tag_last(iterator)
|
||||
last = True
|
||||
loop = common.tag_last(iterator)
|
||||
while True:
|
||||
with section.context(self.current_sync) as itemtype:
|
||||
for i, (last, xml_item) in enumerate(loop):
|
||||
if self.should_cancel():
|
||||
return False
|
||||
while not self.should_cancel():
|
||||
with section.context(self.current_time) as itemtype:
|
||||
for last, xml_item in iterator:
|
||||
section.count += 1
|
||||
if not itemtype.update_userdata(xml_item, section.plex_type):
|
||||
# Somehow did not sync this item yet
|
||||
itemtype.add_update(xml_item,
|
||||
|
@ -242,22 +127,21 @@ class FullSync(backgroundthread.KillableThread):
|
|||
section_id=section.section_id)
|
||||
itemtype.plexdb.update_last_sync(int(xml_item.attrib['ratingKey']),
|
||||
section.plex_type,
|
||||
self.current_sync)
|
||||
self.current += 1
|
||||
self.update_progressbar()
|
||||
if (i + 1) % (10 * BATCH_SIZE) == 0:
|
||||
self.current_time)
|
||||
self.update_progressbar(section, '', section.count)
|
||||
if section.count % (10 * BATCH_SIZE) == 0:
|
||||
break
|
||||
if last:
|
||||
break
|
||||
return True
|
||||
except RuntimeError:
|
||||
LOG.error('Could not entirely process section %s', section)
|
||||
return False
|
||||
self.successful = False
|
||||
|
||||
def threaded_get_iterators(self, kinds, queue, all_items=False):
|
||||
def threaded_get_iterators(self, kinds, queue, all_items):
|
||||
"""
|
||||
Getting iterators is costly, so let's do it asynchronously
|
||||
"""
|
||||
LOG.debug('Start threaded_get_iterators')
|
||||
try:
|
||||
for kind in kinds:
|
||||
for section in (x for x in app.SYNC.sections
|
||||
|
@ -268,86 +152,58 @@ class FullSync(backgroundthread.KillableThread):
|
|||
if not section.sync_to_kodi:
|
||||
LOG.info('User chose to not sync section %s', section)
|
||||
continue
|
||||
element = copy.deepcopy(section)
|
||||
element.plex_type = kind[0]
|
||||
element.section_type = element.plex_type
|
||||
element.context = kind[2]
|
||||
element.get_children = kind[3]
|
||||
element.Queue = kind[4]
|
||||
section = sections.get_sync_section(section,
|
||||
plex_type=kind[0])
|
||||
if self.repair or all_items:
|
||||
updated_at = None
|
||||
else:
|
||||
updated_at = section.last_sync - UPDATED_AT_SAFETY \
|
||||
if section.last_sync else None
|
||||
try:
|
||||
element.iterator = PF.get_section_iterator(
|
||||
section.iterator = PF.get_section_iterator(
|
||||
section.section_id,
|
||||
plex_type=element.plex_type,
|
||||
plex_type=section.plex_type,
|
||||
updated_at=updated_at,
|
||||
last_viewed_at=None)
|
||||
except RuntimeError:
|
||||
LOG.warn('Sync at least partially unsuccessful')
|
||||
self.successful = False
|
||||
self.section_success = False
|
||||
LOG.error('Sync at least partially unsuccessful!')
|
||||
LOG.error('Error getting section iterator %s', section)
|
||||
else:
|
||||
queue.put(element)
|
||||
section.number_of_items = section.iterator.total
|
||||
if section.number_of_items > 0:
|
||||
self.processing_queue.add_section(section)
|
||||
queue.put(section)
|
||||
LOG.debug('Put section in queue with %s items: %s',
|
||||
section.number_of_items, section)
|
||||
except Exception:
|
||||
utils.ERROR(notify=True)
|
||||
finally:
|
||||
queue.put(None)
|
||||
LOG.debug('Exiting threaded_get_iterators')
|
||||
|
||||
def full_library_sync(self):
|
||||
"""
|
||||
"""
|
||||
# structure:
|
||||
# (plex_type,
|
||||
# section_type,
|
||||
# context for itemtype,
|
||||
# download children items, e.g. songs for a specific album?,
|
||||
# Queue)
|
||||
kinds = [
|
||||
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE, itemtypes.Movie, False, Queue.Queue),
|
||||
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW, itemtypes.Show, False, Queue.Queue),
|
||||
(v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW, itemtypes.Season, False, Queue.Queue),
|
||||
(v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW, itemtypes.Episode, False, Queue.Queue)
|
||||
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE),
|
||||
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW),
|
||||
(v.PLEX_TYPE_SEASON, v.PLEX_TYPE_SHOW),
|
||||
(v.PLEX_TYPE_EPISODE, v.PLEX_TYPE_SHOW)
|
||||
]
|
||||
if app.SYNC.enable_music:
|
||||
kinds.extend([
|
||||
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False, Queue.Queue),
|
||||
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, backgroundthread.OrderedQueue),
|
||||
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST),
|
||||
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST),
|
||||
])
|
||||
# ADD NEW ITEMS
|
||||
# Already start setting up the iterators. We need to enforce
|
||||
# syncing e.g. show before season before episode
|
||||
iterator_queue = Queue.Queue()
|
||||
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
|
||||
None,
|
||||
kinds,
|
||||
iterator_queue)
|
||||
backgroundthread.BGThreader.addTask(task)
|
||||
while True:
|
||||
self.section_success = True
|
||||
section = iterator_queue.get()
|
||||
iterator_queue.task_done()
|
||||
if section is None:
|
||||
break
|
||||
# Setup our variables
|
||||
self.plex_type = section.plex_type
|
||||
self.section_type = section.section_type
|
||||
self.context = section.context
|
||||
self.get_children = section.get_children
|
||||
self.queue = section.Queue()
|
||||
# Now do the heavy lifting
|
||||
if self.should_cancel() or not self.addupdate_section(section):
|
||||
return False
|
||||
if self.section_success:
|
||||
# Need to check because a thread might have missed to get
|
||||
# some items from the PMS
|
||||
with PlexDB() as plexdb:
|
||||
# Set the new time mark for the next delta sync
|
||||
plexdb.update_section_last_sync(section.section_id,
|
||||
self.current_sync)
|
||||
backgroundthread.KillableThread(
|
||||
target=self.threaded_get_iterators,
|
||||
args=(kinds, self.section_queue, False)).start()
|
||||
# Do the heavy lifting
|
||||
self.processing_loop_new_and_changed_items()
|
||||
common.update_kodi_library(video=True, music=True)
|
||||
if self.should_cancel() or not self.successful:
|
||||
return
|
||||
|
||||
# Sync Plex playlists to Kodi and vice-versa
|
||||
if common.PLAYLIST_SYNC_ENABLED:
|
||||
|
@ -357,48 +213,29 @@ class FullSync(backgroundthread.KillableThread):
|
|||
self.dialog = xbmcgui.DialogProgressBG()
|
||||
# "Synching playlists"
|
||||
self.dialog.create(utils.lang(39715))
|
||||
if not playlists.full_sync():
|
||||
return False
|
||||
if not playlists.full_sync() or self.should_cancel():
|
||||
return
|
||||
|
||||
# SYNC PLAYSTATE of ALL items (otherwise we won't pick up on items that
|
||||
# were set to unwatched). Also mark all items on the PMS to be able
|
||||
# to delete the ones still in Kodi
|
||||
LOG.info('Start synching playstate and userdata for every item')
|
||||
# In order to not delete all your songs again
|
||||
LOG.debug('Start synching playstate and userdata for every item')
|
||||
if app.SYNC.enable_music:
|
||||
# We don't need to enforce the album order now
|
||||
kinds.pop(5)
|
||||
# In order to not delete all your songs again
|
||||
kinds.extend([
|
||||
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True, Queue.Queue),
|
||||
(v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST, itemtypes.Song, True, Queue.Queue),
|
||||
(v.PLEX_TYPE_SONG, v.PLEX_TYPE_ARTIST),
|
||||
])
|
||||
# Make sure we're not showing an item's title in the sync dialog
|
||||
self.title = ''
|
||||
self.threader.shutdown()
|
||||
self.threader = None
|
||||
if not self.show_dialog_userdata and self.dialog:
|
||||
# Close the progress indicator dialog
|
||||
self.dialog.close()
|
||||
self.dialog = None
|
||||
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
|
||||
None,
|
||||
kinds,
|
||||
iterator_queue,
|
||||
all_items=True)
|
||||
backgroundthread.BGThreader.addTask(task)
|
||||
while True:
|
||||
section = iterator_queue.get()
|
||||
iterator_queue.task_done()
|
||||
if section is None:
|
||||
break
|
||||
# Setup our variables
|
||||
self.plex_type = section.plex_type
|
||||
self.section_type = section.section_type
|
||||
self.context = section.context
|
||||
self.get_children = section.get_children
|
||||
# Now do the heavy lifting
|
||||
if self.should_cancel() or not self.playstate_per_section(section):
|
||||
return False
|
||||
backgroundthread.KillableThread(
|
||||
target=self.threaded_get_iterators,
|
||||
args=(kinds, self.section_queue, True)).start()
|
||||
self.processing_loop_playstates()
|
||||
if self.should_cancel() or not self.successful:
|
||||
return
|
||||
|
||||
# Delete movies that are not on Plex anymore
|
||||
LOG.debug('Looking for items to delete')
|
||||
|
@ -417,60 +254,49 @@ class FullSync(backgroundthread.KillableThread):
|
|||
for plex_type, context in kinds:
|
||||
# Delete movies that are not on Plex anymore
|
||||
while True:
|
||||
with context(self.current_sync) as ctx:
|
||||
plex_ids = list(ctx.plexdb.plex_id_by_last_sync(plex_type,
|
||||
self.current_sync,
|
||||
BATCH_SIZE))
|
||||
with context(self.current_time) as ctx:
|
||||
plex_ids = list(
|
||||
ctx.plexdb.plex_id_by_last_sync(plex_type,
|
||||
self.current_time,
|
||||
BATCH_SIZE))
|
||||
for plex_id in plex_ids:
|
||||
if self.should_cancel():
|
||||
return False
|
||||
return
|
||||
ctx.remove(plex_id, plex_type)
|
||||
if len(plex_ids) < BATCH_SIZE:
|
||||
break
|
||||
LOG.debug('Done deleting')
|
||||
return True
|
||||
LOG.debug('Done looking for items to delete')
|
||||
|
||||
def run(self):
|
||||
app.APP.register_thread(self)
|
||||
LOG.info('Running library sync with repair=%s', self.repair)
|
||||
try:
|
||||
self._run()
|
||||
self.run_full_library_sync()
|
||||
finally:
|
||||
app.APP.deregister_thread(self)
|
||||
LOG.info('Done full_sync')
|
||||
LOG.info('Library sync done. successful: %s', self.successful)
|
||||
|
||||
@utils.log_time
|
||||
def _run(self):
|
||||
self.current_sync = timing.plex_now()
|
||||
# Get latest Plex libraries and build playlist and video node files
|
||||
if self.should_cancel() or not sections.sync_from_pms(self):
|
||||
return
|
||||
self.successful = True
|
||||
def run_full_library_sync(self):
|
||||
try:
|
||||
if self.show_dialog:
|
||||
self.dialog = xbmcgui.DialogProgressBG()
|
||||
self.dialog.create(utils.lang(39714))
|
||||
|
||||
# Actual syncing - do only new items first
|
||||
LOG.info('Running full_library_sync with repair=%s',
|
||||
self.repair)
|
||||
if self.should_cancel() or not self.full_library_sync():
|
||||
# Get latest Plex libraries and build playlist and video node files
|
||||
if self.should_cancel() or not sections.sync_from_pms(self):
|
||||
return
|
||||
if self.should_cancel():
|
||||
self.successful = False
|
||||
return
|
||||
self.full_library_sync()
|
||||
finally:
|
||||
common.update_kodi_library(video=True, music=True)
|
||||
if self.dialog:
|
||||
self.dialog.close()
|
||||
if self.threader:
|
||||
self.threader.shutdown()
|
||||
self.threader = None
|
||||
if not self.successful and not self.should_cancel():
|
||||
# "ERROR in library sync"
|
||||
utils.dialog('notification',
|
||||
heading='{plex}',
|
||||
message=utils.lang(39410),
|
||||
icon='{error}')
|
||||
if self.callback:
|
||||
self.callback(self.successful)
|
||||
self.callback(self.successful)
|
||||
|
||||
|
||||
def start(show_dialog, repair=False, callback=None):
|
||||
|
|
|
@ -2,73 +2,46 @@
|
|||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
|
||||
from . import common
|
||||
from ..plex_api import API
|
||||
from .. import plex_functions as PF, backgroundthread, utils, variables as v
|
||||
|
||||
|
||||
LOG = getLogger("PLEX." + __name__)
|
||||
from .. import backgroundthread, plex_functions as PF, utils, variables as v
|
||||
from .. import app
|
||||
|
||||
LOG = getLogger('PLEX.sync.get_metadata')
|
||||
LOCK = backgroundthread.threading.Lock()
|
||||
# List of tuples: (collection index [as in an item's metadata with "Collection
|
||||
# id"], collection plex id)
|
||||
COLLECTION_MATCH = None
|
||||
# Dict with entries of the form <collection index>: <collection xml>
|
||||
COLLECTION_XMLS = {}
|
||||
|
||||
|
||||
def reset_collections():
|
||||
"""
|
||||
Collections seem unique to Plex sections
|
||||
"""
|
||||
global LOCK, COLLECTION_MATCH, COLLECTION_XMLS
|
||||
with LOCK:
|
||||
COLLECTION_MATCH = None
|
||||
COLLECTION_XMLS = {}
|
||||
|
||||
|
||||
class GetMetadataTask(backgroundthread.Task):
|
||||
class GetMetadataThread(common.LibrarySyncMixin,
|
||||
backgroundthread.KillableThread):
|
||||
"""
|
||||
Threaded download of Plex XML metadata for a certain library item.
|
||||
Fills the queue with the downloaded etree XML objects
|
||||
|
||||
Input:
|
||||
queue Queue.Queue() object where this thread will store
|
||||
the downloaded metadata XMLs as etree objects
|
||||
"""
|
||||
def __init__(self, queue, plex_id, plex_type, get_children=False,
|
||||
count=None):
|
||||
self.queue = queue
|
||||
self.plex_id = plex_id
|
||||
self.plex_type = plex_type
|
||||
self.get_children = get_children
|
||||
self.count = count
|
||||
super(GetMetadataTask, self).__init__()
|
||||
|
||||
def suspend(self, block=False, timeout=None):
|
||||
"""
|
||||
Let's NOT suspend sync threads but immediately terminate them
|
||||
"""
|
||||
self.cancel()
|
||||
def __init__(self, get_metadata_queue, processing_queue):
|
||||
self.get_metadata_queue = get_metadata_queue
|
||||
self.processing_queue = processing_queue
|
||||
super(GetMetadataThread, self).__init__()
|
||||
|
||||
def _collections(self, item):
|
||||
global COLLECTION_MATCH, COLLECTION_XMLS
|
||||
api = API(item['xml'][0])
|
||||
if COLLECTION_MATCH is None:
|
||||
COLLECTION_MATCH = PF.collections(api.library_section_id())
|
||||
if COLLECTION_MATCH is None:
|
||||
collection_match = item['section'].collection_match
|
||||
collection_xmls = item['section'].collection_xmls
|
||||
if collection_match is None:
|
||||
collection_match = PF.collections(api.library_section_id())
|
||||
if collection_match is None:
|
||||
LOG.error('Could not download collections')
|
||||
return
|
||||
# Extract what we need to know
|
||||
COLLECTION_MATCH = \
|
||||
collection_match = \
|
||||
[(utils.cast(int, x.get('index')),
|
||||
utils.cast(int, x.get('ratingKey'))) for x in COLLECTION_MATCH]
|
||||
utils.cast(int, x.get('ratingKey'))) for x in collection_match]
|
||||
item['children'] = {}
|
||||
for plex_set_id, set_name in api.collections():
|
||||
if self.should_cancel():
|
||||
return
|
||||
if plex_set_id not in COLLECTION_XMLS:
|
||||
if plex_set_id not in collection_xmls:
|
||||
# Get Plex metadata for collections - a pain
|
||||
for index, collection_plex_id in COLLECTION_MATCH:
|
||||
for index, collection_plex_id in collection_match:
|
||||
if index == plex_set_id:
|
||||
collection_xml = PF.GetPlexMetadata(collection_plex_id)
|
||||
try:
|
||||
|
@ -77,54 +50,84 @@ class GetMetadataTask(backgroundthread.Task):
|
|||
LOG.error('Could not get collection %s %s',
|
||||
collection_plex_id, set_name)
|
||||
continue
|
||||
COLLECTION_XMLS[plex_set_id] = collection_xml
|
||||
collection_xmls[plex_set_id] = collection_xml
|
||||
break
|
||||
else:
|
||||
LOG.error('Did not find Plex collection %s %s',
|
||||
plex_set_id, set_name)
|
||||
continue
|
||||
item['children'][plex_set_id] = COLLECTION_XMLS[plex_set_id]
|
||||
item['children'][plex_set_id] = collection_xmls[plex_set_id]
|
||||
|
||||
def _process_abort(self, count, section):
|
||||
# Make sure other threads will also receive sentinel
|
||||
self.get_metadata_queue.put(None)
|
||||
if count is not None:
|
||||
self._process_skipped_item(count, section)
|
||||
|
||||
def _process_skipped_item(self, count, section):
|
||||
section.sync_successful = False
|
||||
# Add a "dummy" item so we're not skipping a beat
|
||||
self.processing_queue.put((count, {'section': section, 'xml': None}))
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
Do the work
|
||||
"""
|
||||
if self.should_cancel():
|
||||
return
|
||||
# Download Metadata
|
||||
item = {
|
||||
'xml': PF.GetPlexMetadata(self.plex_id),
|
||||
'children': None
|
||||
}
|
||||
if item['xml'] is None:
|
||||
# Did not receive a valid XML - skip that item for now
|
||||
LOG.error("Could not get metadata for %s. Skipping that item "
|
||||
"for now", self.plex_id)
|
||||
return
|
||||
elif item['xml'] == 401:
|
||||
LOG.error('HTTP 401 returned by PMS. Too much strain? '
|
||||
'Cancelling sync for now')
|
||||
utils.window('plex_scancrashed', value='401')
|
||||
return
|
||||
if not self.should_cancel() and self.plex_type == v.PLEX_TYPE_MOVIE:
|
||||
# Check for collections/sets
|
||||
collections = False
|
||||
for child in item['xml'][0]:
|
||||
if child.tag == 'Collection':
|
||||
collections = True
|
||||
break
|
||||
if collections:
|
||||
global LOCK
|
||||
with LOCK:
|
||||
self._collections(item)
|
||||
if not self.should_cancel() and self.get_children:
|
||||
children_xml = PF.GetAllPlexChildren(self.plex_id)
|
||||
LOG.debug('Starting %s thread', self.__class__.__name__)
|
||||
app.APP.register_thread(self)
|
||||
try:
|
||||
self._run()
|
||||
finally:
|
||||
app.APP.deregister_thread(self)
|
||||
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
|
||||
|
||||
def _run(self):
|
||||
while True:
|
||||
item = self.get_metadata_queue.get()
|
||||
try:
|
||||
children_xml[0].attrib
|
||||
except (TypeError, IndexError, AttributeError):
|
||||
LOG.error('Could not get children for Plex id %s',
|
||||
self.plex_id)
|
||||
else:
|
||||
item['children'] = children_xml
|
||||
if not self.should_cancel():
|
||||
self.queue.put((self.count, item))
|
||||
if item is None or self.should_cancel():
|
||||
self._process_abort(item[0] if item else None,
|
||||
item[2] if item else None)
|
||||
break
|
||||
count, plex_id, section = item
|
||||
item = {
|
||||
'xml': PF.GetPlexMetadata(plex_id), # This will block
|
||||
'children': None,
|
||||
'section': section
|
||||
}
|
||||
if item['xml'] is None:
|
||||
# Did not receive a valid XML - skip that item for now
|
||||
LOG.error("Could not get metadata for %s. Skipping item "
|
||||
"for now", plex_id)
|
||||
self._process_skipped_item(count, section)
|
||||
continue
|
||||
elif item['xml'] == 401:
|
||||
LOG.error('HTTP 401 returned by PMS. Too much strain? '
|
||||
'Cancelling sync for now')
|
||||
utils.window('plex_scancrashed', value='401')
|
||||
self._process_abort(count, section)
|
||||
break
|
||||
if section.plex_type == v.PLEX_TYPE_MOVIE:
|
||||
# Check for collections/sets
|
||||
collections = False
|
||||
for child in item['xml'][0]:
|
||||
if child.tag == 'Collection':
|
||||
collections = True
|
||||
break
|
||||
if collections:
|
||||
with LOCK:
|
||||
self._collections(item)
|
||||
if section.get_children:
|
||||
if self.should_cancel():
|
||||
self._process_abort(count, section)
|
||||
break
|
||||
children_xml = PF.GetAllPlexChildren(plex_id) # Will block
|
||||
try:
|
||||
children_xml[0].attrib
|
||||
except (TypeError, IndexError, AttributeError):
|
||||
LOG.error('Could not get children for Plex id %s',
|
||||
plex_id)
|
||||
self._process_skipped_item(count, section)
|
||||
continue
|
||||
else:
|
||||
item['children'] = children_xml
|
||||
self.processing_queue.put((count, item))
|
||||
finally:
|
||||
self.get_metadata_queue.task_done()
|
||||
|
|
104
resources/lib/library_sync/process_metadata.py
Normal file
104
resources/lib/library_sync/process_metadata.py
Normal file
|
@ -0,0 +1,104 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
|
||||
from . import common, sections
|
||||
from ..plex_db import PlexDB
|
||||
from .. import backgroundthread, app
|
||||
|
||||
LOG = getLogger('PLEX.sync.process_metadata')
|
||||
|
||||
COMMIT_TO_DB_EVERY_X_ITEMS = 500
|
||||
|
||||
|
||||
class ProcessMetadataThread(common.LibrarySyncMixin,
|
||||
backgroundthread.KillableThread):
|
||||
"""
|
||||
Invoke once in order to process the received PMS metadata xmls
|
||||
"""
|
||||
def __init__(self, current_time, processing_queue, update_progressbar):
|
||||
self.current_time = current_time
|
||||
self.processing_queue = processing_queue
|
||||
self.update_progressbar = update_progressbar
|
||||
self.last_section = sections.Section()
|
||||
self.successful = True
|
||||
super(ProcessMetadataThread, self).__init__()
|
||||
|
||||
def start_section(self, section):
|
||||
if section != self.last_section:
|
||||
if self.last_section:
|
||||
self.finish_last_section()
|
||||
LOG.debug('Start or continue processing section %s', section)
|
||||
self.last_section = section
|
||||
# Warn the user for this new section if we cannot access a file
|
||||
app.SYNC.path_verified = False
|
||||
else:
|
||||
LOG.debug('Resume processing section %s', section)
|
||||
|
||||
def finish_last_section(self):
|
||||
if (not self.should_cancel() and self.last_section and
|
||||
self.last_section.sync_successful):
|
||||
# Check for should_cancel() because we cannot be sure that we
|
||||
# processed every item of the section
|
||||
with PlexDB() as plexdb:
|
||||
# Set the new time mark for the next delta sync
|
||||
plexdb.update_section_last_sync(self.last_section.section_id,
|
||||
self.current_time)
|
||||
LOG.info('Finished processing section successfully: %s',
|
||||
self.last_section)
|
||||
elif self.last_section and not self.last_section.sync_successful:
|
||||
LOG.warn('Sync not successful for section %s', self.last_section)
|
||||
self.successful = False
|
||||
|
||||
def _get(self):
|
||||
item = {'xml': None}
|
||||
while not self.should_cancel() and item and item['xml'] is None:
|
||||
item = self.processing_queue.get()
|
||||
self.processing_queue.task_done()
|
||||
return item
|
||||
|
||||
def run(self):
|
||||
LOG.debug('Starting %s thread', self.__class__.__name__)
|
||||
app.APP.register_thread(self)
|
||||
try:
|
||||
self._run()
|
||||
except Exception:
|
||||
from .. import utils
|
||||
utils.ERROR(notify=True)
|
||||
finally:
|
||||
app.APP.deregister_thread(self)
|
||||
LOG.debug('##===---- %s Stopped ----===##', self.__class__.__name__)
|
||||
|
||||
def _run(self):
|
||||
# There are 2 sentinels: None for aborting/ending this thread, the dict
|
||||
# {'section': section, 'xml': None} for skipped/invalid items
|
||||
item = self._get()
|
||||
if item:
|
||||
section = item['section']
|
||||
processed = 0
|
||||
self.start_section(section)
|
||||
while not self.should_cancel():
|
||||
if item is None:
|
||||
break
|
||||
elif item['section'] != section:
|
||||
# We received an entirely new section
|
||||
self.start_section(item['section'])
|
||||
section = item['section']
|
||||
with section.context(self.current_time) as context:
|
||||
while not self.should_cancel():
|
||||
if item is None or item['section'] != section:
|
||||
break
|
||||
self.update_progressbar(section,
|
||||
item['xml'][0].get('title'),
|
||||
section.count)
|
||||
context.add_update(item['xml'][0],
|
||||
section_name=section.name,
|
||||
section_id=section.section_id,
|
||||
children=item['children'])
|
||||
processed += 1
|
||||
section.count += 1
|
||||
if processed == COMMIT_TO_DB_EVERY_X_ITEMS:
|
||||
processed = 0
|
||||
context.commit()
|
||||
item = self._get()
|
||||
self.finish_last_section()
|
|
@ -54,6 +54,8 @@ class Section(object):
|
|||
self.content = None # unicode
|
||||
# Setting the section_type WILL re_set sync_to_kodi!
|
||||
self._section_type = None # unicode
|
||||
# E.g. "season" or "movie" (translated)
|
||||
self.section_type_text = None
|
||||
# Do we sync all items of this section to the Kodi DB?
|
||||
# This will be set with section_type!!
|
||||
self.sync_to_kodi = None # bool
|
||||
|
@ -77,13 +79,9 @@ class Section(object):
|
|||
self.order = None
|
||||
# Original PMS xml for this section, including children
|
||||
self.xml = None
|
||||
# Attributes that will be initialized later by full_sync.py
|
||||
self.iterator = None
|
||||
self.context = None
|
||||
self.get_children = None
|
||||
# A section_type encompasses possible several plex_types! E.g. shows
|
||||
# contain shows, seasons, episodes
|
||||
self.plex_type = None
|
||||
self._plex_type = None
|
||||
if xml_element is not None:
|
||||
self.from_xml(xml_element)
|
||||
elif section_db_element:
|
||||
|
@ -106,9 +104,14 @@ class Section(object):
|
|||
self.section_type is not None)
|
||||
|
||||
def __eq__(self, section):
|
||||
"""
|
||||
Sections compare equal if their section_id, name and plex_type (first
|
||||
prio) OR section_type (if there is no plex_type is set) compare equal
|
||||
"""
|
||||
return (self.section_id == section.section_id and
|
||||
self.name == section.name and
|
||||
self.section_type == section.section_type)
|
||||
(self.plex_type == section.plex_type if self.plex_type else
|
||||
self.section_type == section.section_type))
|
||||
|
||||
def __ne__(self, section):
|
||||
return not self == section
|
||||
|
@ -140,6 +143,15 @@ class Section(object):
|
|||
else:
|
||||
self.sync_to_kodi = True
|
||||
|
||||
@property
|
||||
def plex_type(self):
|
||||
return self._plex_type
|
||||
|
||||
@plex_type.setter
|
||||
def plex_type(self, value):
|
||||
self._plex_type = value
|
||||
self.section_type_text = utils.lang(v.TRANSLATION_FROM_PLEXTYPE[value])
|
||||
|
||||
@property
|
||||
def index(self):
|
||||
return self._index
|
||||
|
@ -431,6 +443,39 @@ class Section(object):
|
|||
self.remove_from_plex()
|
||||
|
||||
|
||||
def _get_children(plex_type):
|
||||
if plex_type == v.PLEX_TYPE_ALBUM:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def get_sync_section(section, plex_type):
|
||||
"""
|
||||
Deep-copies section and adds certain arguments in order to prep section
|
||||
for the library sync
|
||||
"""
|
||||
section = copy.deepcopy(section)
|
||||
section.plex_type = plex_type
|
||||
section.context = itemtypes.ITEMTYPE_FROM_PLEXTYPE[plex_type]
|
||||
section.get_children = _get_children(plex_type)
|
||||
# Some more init stuff
|
||||
# Has sync for this section been successful?
|
||||
section.sync_successful = True
|
||||
# List of tuples: (collection index [as in an item's metadata with
|
||||
# "Collection id"], collection plex id)
|
||||
section.collection_match = None
|
||||
# Dict with entries of the form <collection index>: <collection xml>
|
||||
section.collection_xmls = {}
|
||||
# Keep count during sync
|
||||
section.count = 0
|
||||
# Total number of items that we need to sync
|
||||
section.number_of_items = 0
|
||||
# Iterator to get one sync item after the other
|
||||
section.iterator = None
|
||||
return section
|
||||
|
||||
|
||||
def force_full_sync():
|
||||
"""
|
||||
Resets the sync timestamp for all sections to 0, thus forcing a subsequent
|
||||
|
|
Loading…
Reference in a new issue