Download entire xmls asynchronously, not just a piece
This commit is contained in:
parent
14359dd5f4
commit
18891a67fc
1 changed files with 23 additions and 36 deletions
|
@ -556,8 +556,7 @@ class DownloadGen(object):
|
|||
def __init__(self, url, plex_type=None, last_viewed_at=None,
|
||||
updated_at=None, args=None):
|
||||
self.lock = backgroundthread.threading.Lock()
|
||||
self.pending_lock = backgroundthread.threading.Lock()
|
||||
self._args = args or {}
|
||||
self.args = args or {}
|
||||
url += '?'
|
||||
if plex_type:
|
||||
url = '%stype=%s&' % (url, v.PLEX_TYPE_NUMBER_FROM_PLEX_TYPE[plex_type])
|
||||
|
@ -565,48 +564,46 @@ class DownloadGen(object):
|
|||
url = '%slastViewedAt>=%s&' % (url, last_viewed_at)
|
||||
if updated_at:
|
||||
url = '%supdatedAt>=%s&' % (url, updated_at)
|
||||
self._url = url[:-1]
|
||||
self._pos = 0
|
||||
self._current = 0
|
||||
self._exhausted = False
|
||||
# Start download the next chunk once we've reached 10% of the current
|
||||
# xml's content
|
||||
self._threshold = int(CONTAINERSIZE / 10)
|
||||
self._download_chunk()
|
||||
self.url = url[:-1]
|
||||
self._download_chunk(start=0)
|
||||
self.attrib = deepcopy(self.xml.attrib)
|
||||
self.total = int(self.attrib['totalSize'])
|
||||
# Will keep track whether we still have results incoming
|
||||
self.pending_counter = []
|
||||
for i in range(CONTAINERSIZE,
|
||||
self.total + (CONTAINERSIZE - self.total % CONTAINERSIZE),
|
||||
CONTAINERSIZE):
|
||||
self._download_chunk(start=i)
|
||||
self.pending_counter.append(None)
|
||||
|
||||
def _download_chunk(self):
|
||||
self._args.update({
|
||||
def _download_chunk(self, start):
|
||||
self.args.update({
|
||||
'X-Plex-Container-Size': CONTAINERSIZE,
|
||||
'X-Plex-Container-Start': self._pos,
|
||||
'X-Plex-Container-Start': start,
|
||||
'sort': 'id', # Entries are sorted by plex_id
|
||||
'excludeAllLeaves': 1 # PMS wont attach a first summary child
|
||||
})
|
||||
if self._pos == 0:
|
||||
self.xml = DU().downloadUrl(self._url, parameters=self._args)
|
||||
if start == 0:
|
||||
# We need the result NOW
|
||||
self.xml = DU().downloadUrl(self.url, parameters=self.args)
|
||||
try:
|
||||
self.xml.attrib
|
||||
except AttributeError:
|
||||
LOG.error('Error while downloading chunks: %s, args: %s',
|
||||
self._url, self._args)
|
||||
self.url, self.args)
|
||||
raise RuntimeError('Error while downloading chunks for %s'
|
||||
% self._url)
|
||||
if len(self.xml) < CONTAINERSIZE:
|
||||
self._exhausted = True
|
||||
% self.url)
|
||||
else:
|
||||
self.pending_lock.acquire()
|
||||
task = DownloadChunk()
|
||||
task.setup(self._url, self._args, self.on_chunk_downloaded)
|
||||
task.setup(self.url, self.args, self.on_chunk_downloaded)
|
||||
backgroundthread.BGThreader.addTask(task)
|
||||
|
||||
def on_chunk_downloaded(self, xml):
|
||||
if xml:
|
||||
if len(xml) < CONTAINERSIZE:
|
||||
self._exhausted = True
|
||||
with self.lock:
|
||||
for child in xml:
|
||||
self.xml.append(child)
|
||||
self.pending_lock.release()
|
||||
self.pending_counter.pop()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
@ -615,24 +612,14 @@ class DownloadGen(object):
|
|||
return self.__next__()
|
||||
|
||||
def __next__(self):
|
||||
if (not self._exhausted and
|
||||
self._current % CONTAINERSIZE == self._threshold):
|
||||
# Kick off download of next chunk
|
||||
self._pos += CONTAINERSIZE
|
||||
self._download_chunk()
|
||||
while True:
|
||||
if len(self.xml):
|
||||
child = self.xml[0]
|
||||
with self.lock:
|
||||
self.xml.remove(child)
|
||||
self._current += 1
|
||||
return child
|
||||
# Currently no items left, BUT some could still be downloading
|
||||
self.pending_lock.acquire()
|
||||
# Above will block until download finished - release immediately
|
||||
# again
|
||||
self.pending_lock.release()
|
||||
if not len(self.xml):
|
||||
sleep(20)
|
||||
if not len(self.pending_counter) and not len(self.xml):
|
||||
raise StopIteration
|
||||
|
||||
def get(self, key, default=None):
|
||||
|
|
Loading…
Reference in a new issue