diff --git a/resources/lib/plex_functions.py b/resources/lib/plex_functions.py index 7d644d8a..4b6f4274 100644 --- a/resources/lib/plex_functions.py +++ b/resources/lib/plex_functions.py @@ -11,9 +11,7 @@ from threading import Thread from xbmc import sleep from .downloadutils import DownloadUtils as DU -from . import utils -from . import plex_tv -from . import variables as v +from . import backgroundthread, utils, plex_tv, variables as v ############################################################################### LOG = getLogger('PLEX.plex_functions') @@ -528,6 +526,26 @@ def GetPlexSectionResults(viewId, args=None): return DownloadChunks(url) +class DownloadChunk(backgroundthread.Task): + """ + This task will also be executed while library sync is suspended! + """ + def setup(self, url, args, callback): + self.url = url + self.args = args + self.callback = callback + + def run(self): + xml = DU().downloadUrl(self.url, parameters=self.args) + try: + xml.attrib + except AttributeError: + LOG.error('Error while downloading chunks: %s, args: %s', + self.url, self.args) + xml = None + self.callback(xml) + + class DownloadGen(object): """ Special iterator object that will yield all child xmls piece-wise. It also @@ -537,6 +555,8 @@ class DownloadGen(object): """ def __init__(self, url, plex_type=None, last_viewed_at=None, updated_at=None, args=None): + self.lock = backgroundthread.threading.Lock() + self.pending_lock = backgroundthread.threading.Lock() self._args = args or {} url += '?' if plex_type: @@ -545,11 +565,13 @@ class DownloadGen(object): url = '%slastViewedAt>=%s&' % (url, last_viewed_at) if updated_at: url = '%supdatedAt>=%s&' % (url, updated_at) - if url.endswith('?') or url.endswith('&'): - url = url[:-1] - self._url = url + self._url = url[:-1] self._pos = 0 + self._current = 0 self._exhausted = False + # Start download the next chunk once we've reached 10% of the current + # xml's content + self._threshold = int(CONTAINERSIZE / 10) self._download_chunk() self.attrib = deepcopy(self.xml.attrib) @@ -560,14 +582,31 @@ class DownloadGen(object): 'sort': 'id', # Entries are sorted by plex_id 'excludeAllLeaves': 1 # PMS wont attach a first summary child }) - self.xml = DU().downloadUrl(self._url, parameters=self._args) - try: - self.xml.attrib - except AttributeError: - LOG.error('Error while downloading chunks: %s, args: %s', - self._url, self._args) - raise RuntimeError('Error while downloading chunks for %s' - % self._url) + if self._pos == 0: + self.xml = DU().downloadUrl(self._url, parameters=self._args) + try: + self.xml.attrib + except AttributeError: + LOG.error('Error while downloading chunks: %s, args: %s', + self._url, self._args) + raise RuntimeError('Error while downloading chunks for %s' + % self._url) + if len(self.xml) < CONTAINERSIZE: + self._exhausted = True + else: + self.pending_lock.acquire() + task = DownloadChunk() + task.setup(self._url, self._args, self.on_chunk_downloaded) + backgroundthread.BGThreader.addTask(task) + + def on_chunk_downloaded(self, xml): + if xml: + if len(xml) < CONTAINERSIZE: + self._exhausted = True + with self.lock: + for child in xml: + self.xml.append(child) + self.pending_lock.release() def __iter__(self): return self @@ -576,20 +615,25 @@ class DownloadGen(object): return self.__next__() def __next__(self): - if len(self.xml): - child = self.xml[0] - self.xml.remove(child) - return child - elif self._exhausted: - raise StopIteration - else: + if (not self._exhausted and + self._current % CONTAINERSIZE == self._threshold): + # Kick off download of next chunk self._pos += CONTAINERSIZE self._download_chunk() + while True: + if len(self.xml): + child = self.xml[0] + with self.lock: + self.xml.remove(child) + self._current += 1 + return child + # Currently no items left, BUT some could still be downloading + self.pending_lock.acquire() + # Above will block until download finished - release immediately + # again + self.pending_lock.release() if not len(self.xml): raise StopIteration - if len(self.xml) < CONTAINERSIZE: - self._exhausted = True - return self.__next__() def get(self, key, default=None): return self.attrib.get(key, default)