Download PMS xml chunks asynchronously

This commit is contained in:
croneter 2018-11-09 14:39:43 +01:00
parent 136d242780
commit e624edc7ae

View file

@ -11,9 +11,7 @@ from threading import Thread
from xbmc import sleep from xbmc import sleep
from .downloadutils import DownloadUtils as DU from .downloadutils import DownloadUtils as DU
from . import utils from . import backgroundthread, utils, plex_tv, variables as v
from . import plex_tv
from . import variables as v
############################################################################### ###############################################################################
LOG = getLogger('PLEX.plex_functions') LOG = getLogger('PLEX.plex_functions')
@ -528,6 +526,26 @@ def GetPlexSectionResults(viewId, args=None):
return DownloadChunks(url) return DownloadChunks(url)
class DownloadChunk(backgroundthread.Task):
"""
This task will also be executed while library sync is suspended!
"""
def setup(self, url, args, callback):
self.url = url
self.args = args
self.callback = callback
def run(self):
xml = DU().downloadUrl(self.url, parameters=self.args)
try:
xml.attrib
except AttributeError:
LOG.error('Error while downloading chunks: %s, args: %s',
self.url, self.args)
xml = None
self.callback(xml)
class DownloadGen(object): class DownloadGen(object):
""" """
Special iterator object that will yield all child xmls piece-wise. It also Special iterator object that will yield all child xmls piece-wise. It also
@ -537,6 +555,8 @@ class DownloadGen(object):
""" """
def __init__(self, url, plex_type=None, last_viewed_at=None, def __init__(self, url, plex_type=None, last_viewed_at=None,
updated_at=None, args=None): updated_at=None, args=None):
self.lock = backgroundthread.threading.Lock()
self.pending_lock = backgroundthread.threading.Lock()
self._args = args or {} self._args = args or {}
url += '?' url += '?'
if plex_type: if plex_type:
@ -545,11 +565,13 @@ class DownloadGen(object):
url = '%slastViewedAt>=%s&' % (url, last_viewed_at) url = '%slastViewedAt>=%s&' % (url, last_viewed_at)
if updated_at: if updated_at:
url = '%supdatedAt>=%s&' % (url, updated_at) url = '%supdatedAt>=%s&' % (url, updated_at)
if url.endswith('?') or url.endswith('&'): self._url = url[:-1]
url = url[:-1]
self._url = url
self._pos = 0 self._pos = 0
self._current = 0
self._exhausted = False self._exhausted = False
# Start download the next chunk once we've reached 10% of the current
# xml's content
self._threshold = int(CONTAINERSIZE / 10)
self._download_chunk() self._download_chunk()
self.attrib = deepcopy(self.xml.attrib) self.attrib = deepcopy(self.xml.attrib)
@ -560,6 +582,7 @@ class DownloadGen(object):
'sort': 'id', # Entries are sorted by plex_id 'sort': 'id', # Entries are sorted by plex_id
'excludeAllLeaves': 1 # PMS wont attach a first summary child 'excludeAllLeaves': 1 # PMS wont attach a first summary child
}) })
if self._pos == 0:
self.xml = DU().downloadUrl(self._url, parameters=self._args) self.xml = DU().downloadUrl(self._url, parameters=self._args)
try: try:
self.xml.attrib self.xml.attrib
@ -568,6 +591,22 @@ class DownloadGen(object):
self._url, self._args) self._url, self._args)
raise RuntimeError('Error while downloading chunks for %s' raise RuntimeError('Error while downloading chunks for %s'
% self._url) % self._url)
if len(self.xml) < CONTAINERSIZE:
self._exhausted = True
else:
self.pending_lock.acquire()
task = DownloadChunk()
task.setup(self._url, self._args, self.on_chunk_downloaded)
backgroundthread.BGThreader.addTask(task)
def on_chunk_downloaded(self, xml):
if xml:
if len(xml) < CONTAINERSIZE:
self._exhausted = True
with self.lock:
for child in xml:
self.xml.append(child)
self.pending_lock.release()
def __iter__(self): def __iter__(self):
return self return self
@ -576,20 +615,25 @@ class DownloadGen(object):
return self.__next__() return self.__next__()
def __next__(self): def __next__(self):
if len(self.xml): if (not self._exhausted and
child = self.xml[0] self._current % CONTAINERSIZE == self._threshold):
self.xml.remove(child) # Kick off download of next chunk
return child
elif self._exhausted:
raise StopIteration
else:
self._pos += CONTAINERSIZE self._pos += CONTAINERSIZE
self._download_chunk() self._download_chunk()
while True:
if len(self.xml):
child = self.xml[0]
with self.lock:
self.xml.remove(child)
self._current += 1
return child
# Currently no items left, BUT some could still be downloading
self.pending_lock.acquire()
# Above will block until download finished - release immediately
# again
self.pending_lock.release()
if not len(self.xml): if not len(self.xml):
raise StopIteration raise StopIteration
if len(self.xml) < CONTAINERSIZE:
self._exhausted = True
return self.__next__()
def get(self, key, default=None): def get(self, key, default=None):
return self.attrib.get(key, default) return self.attrib.get(key, default)