Get section overview xml asynchronously
This commit is contained in:
parent
41bbdbc206
commit
07cf25b324
1 changed files with 102 additions and 65 deletions
|
@ -2,6 +2,12 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from __future__ import absolute_import, division, unicode_literals
|
||||
from logging import getLogger
|
||||
import Queue
|
||||
import copy
|
||||
|
||||
from cProfile import Profile
|
||||
from pstats import Stats
|
||||
from StringIO import StringIO
|
||||
|
||||
from .get_metadata import GetMetadataTask, reset_collections
|
||||
from .process_metadata import InitNewSection, UpdateLastSync, ProcessMetadata, \
|
||||
|
@ -84,67 +90,78 @@ class FullSync(common.libsync_mixin):
|
|||
self.queue.put(UpdateUserdata(xml_item))
|
||||
|
||||
@utils.log_time
|
||||
def process_kind(self):
|
||||
"""
|
||||
"""
|
||||
successful = True
|
||||
LOG.debug('Start processing %ss', self.section_type)
|
||||
sects = (x for x in sections.SECTIONS
|
||||
if x['plex_type'] == self.section_type)
|
||||
for section in sects:
|
||||
LOG.debug('Processing library section %s', section)
|
||||
if self.isCanceled():
|
||||
return False
|
||||
if not self.install_sync_done:
|
||||
app.SYNC.path_verified = False
|
||||
try:
|
||||
# Sync new, updated and deleted items
|
||||
self.section_initiated = True
|
||||
iterator = PF.SectionItems(section['section_id'],
|
||||
plex_type=self.plex_type)
|
||||
# Tell the processing thread about this new section
|
||||
queue_info = InitNewSection(self.context,
|
||||
iterator.total,
|
||||
iterator.get('librarySectionTitle'),
|
||||
section['section_id'],
|
||||
self.plex_type)
|
||||
self.queue.put(queue_info)
|
||||
for xml_item in iterator:
|
||||
if self.isCanceled():
|
||||
return False
|
||||
self.process_item(xml_item)
|
||||
except RuntimeError:
|
||||
LOG.error('Could not entirely process section %s', section)
|
||||
successful = False
|
||||
continue
|
||||
LOG.debug('Waiting for download threads to finish')
|
||||
while self.threader.threader.working():
|
||||
app.APP.monitor.waitForAbort(0.1)
|
||||
LOG.debug('Waiting for processing thread to finish section')
|
||||
def process_section(self, section):
|
||||
LOG.debug('Processing library section %s', section)
|
||||
if self.isCanceled():
|
||||
return False
|
||||
if not self.install_sync_done:
|
||||
app.SYNC.path_verified = False
|
||||
try:
|
||||
# Sync new, updated and deleted items
|
||||
self.section_initiated = True
|
||||
iterator = section['iterator_1']
|
||||
# Tell the processing thread about this new section
|
||||
queue_info = InitNewSection(section['context'],
|
||||
iterator.total,
|
||||
iterator.get('librarySectionTitle'),
|
||||
section['section_id'],
|
||||
section['plex_type'])
|
||||
self.queue.put(queue_info)
|
||||
for xml_item in iterator:
|
||||
if self.isCanceled():
|
||||
return False
|
||||
self.process_item(xml_item)
|
||||
except RuntimeError:
|
||||
LOG.error('Could not entirely process section %s', section)
|
||||
return False
|
||||
LOG.debug('Waiting for download threads to finish')
|
||||
while self.threader.threader.working():
|
||||
app.APP.monitor.waitForAbort(0.1)
|
||||
LOG.debug('Waiting for processing thread to finish section')
|
||||
self.queue.join()
|
||||
reset_collections()
|
||||
try:
|
||||
# Sync playstate of every item
|
||||
iterator = section['iterator_2']
|
||||
# Tell the processing thread that we're syncing playstate
|
||||
queue_info = InitNewSection(section['context'],
|
||||
iterator.total,
|
||||
iterator.get('librarySectionTitle'),
|
||||
section['section_id'],
|
||||
section['plex_type'])
|
||||
self.queue.put(queue_info)
|
||||
if section['plex_type'] != v.PLEX_TYPE_ARTIST:
|
||||
self.process_playstate(iterator)
|
||||
self.queue.join()
|
||||
reset_collections()
|
||||
try:
|
||||
# Sync playstate of every item
|
||||
iterator = PF.SectionItems(section['section_id'],
|
||||
plex_type=self.plex_type)
|
||||
# Tell the processing thread that we're syncing playstate
|
||||
queue_info = InitNewSection(self.context,
|
||||
iterator.total,
|
||||
iterator.get('librarySectionTitle'),
|
||||
section['section_id'],
|
||||
self.plex_type)
|
||||
self.queue.put(queue_info)
|
||||
if self.plex_type != v.PLEX_TYPE_ARTIST:
|
||||
self.process_playstate(iterator)
|
||||
self.queue.join()
|
||||
except RuntimeError:
|
||||
LOG.error('Could not process playstate for section %s', section)
|
||||
successful = False
|
||||
continue
|
||||
LOG.debug('Done processing playstate for section')
|
||||
except RuntimeError:
|
||||
LOG.error('Could not process playstate for section %s', section)
|
||||
return False
|
||||
LOG.debug('Done processing playstate for section')
|
||||
return True
|
||||
|
||||
LOG.debug('Finished processing %ss', self.plex_type)
|
||||
return successful
|
||||
def threaded_get_iterators(self, kinds, queue):
|
||||
"""
|
||||
PF.SectionItems is costly, so let's do it asynchronous
|
||||
"""
|
||||
try:
|
||||
for kind in kinds:
|
||||
for section in (x for x in sections.SECTIONS
|
||||
if x['plex_type'] == kind[1]):
|
||||
if self.isCanceled():
|
||||
return
|
||||
element = copy.deepcopy(section)
|
||||
element['section_type'] = element['plex_type']
|
||||
element['plex_type'] = kind[0]
|
||||
element['element_type'] = kind[1]
|
||||
element['context'] = kind[2]
|
||||
element['get_children'] = kind[3]
|
||||
element['iterator_1'] = PF.SectionItems(section['section_id'],
|
||||
plex_type=kind[0])
|
||||
element['iterator_2'] = PF.SectionItems(section['section_id'],
|
||||
plex_type=kind[0])
|
||||
queue.put(element)
|
||||
finally:
|
||||
queue.put(None)
|
||||
|
||||
def full_library_sync(self):
|
||||
"""
|
||||
|
@ -160,16 +177,27 @@ class FullSync(common.libsync_mixin):
|
|||
(v.PLEX_TYPE_ARTIST, v.PLEX_TYPE_ARTIST, itemtypes.Artist, False),
|
||||
(v.PLEX_TYPE_ALBUM, v.PLEX_TYPE_ARTIST, itemtypes.Album, True),
|
||||
])
|
||||
# Already start setting up the iterators. We need to enforce
|
||||
# syncing e.g. show before season before episode
|
||||
iterator_queue = Queue.Queue()
|
||||
task = backgroundthread.FunctionAsTask(self.threaded_get_iterators,
|
||||
None,
|
||||
kinds,
|
||||
iterator_queue)
|
||||
backgroundthread.BGThreader.addTask(task)
|
||||
with PlexDB() as self.plexdb:
|
||||
for kind in kinds:
|
||||
while True:
|
||||
section = iterator_queue.get()
|
||||
if section is None:
|
||||
break
|
||||
self.section_initiated = False
|
||||
# Setup our variables
|
||||
self.plex_type = kind[0]
|
||||
self.section_type = kind[1]
|
||||
self.context = kind[2]
|
||||
self.get_children = kind[3]
|
||||
self.plex_type = section['plex_type']
|
||||
self.section_type = section['section_type']
|
||||
self.context = section['context']
|
||||
self.get_children = section['get_children']
|
||||
# Now do the heavy lifting
|
||||
if self.isCanceled() or not self.process_kind():
|
||||
if self.isCanceled() or not self.process_section(section):
|
||||
return False
|
||||
# Delete movies that are not on Plex anymore
|
||||
if not self.section_initiated:
|
||||
|
@ -181,10 +209,13 @@ class FullSync(common.libsync_mixin):
|
|||
self.plex_type)
|
||||
self.queue.put(queue_info)
|
||||
self.process_delete()
|
||||
iterator_queue.task_done()
|
||||
return True
|
||||
|
||||
@utils.log_time
|
||||
def run(self):
|
||||
profile = Profile()
|
||||
profile.enable()
|
||||
if self.isCanceled():
|
||||
return
|
||||
successful = False
|
||||
|
@ -226,6 +257,12 @@ class FullSync(common.libsync_mixin):
|
|||
if self.callback:
|
||||
self.callback(successful)
|
||||
LOG.info('Done full_sync')
|
||||
profile.disable()
|
||||
string_io = StringIO()
|
||||
stats = Stats(profile, stream=string_io).sort_stats('cumulative')
|
||||
stats.print_stats()
|
||||
LOG.info('cProfile result: ')
|
||||
LOG.info(string_io.getvalue())
|
||||
|
||||
|
||||
def start(show_dialog, repair=False, callback=None):
|
||||
|
|
Loading…
Reference in a new issue