Further optimize sync speed
This commit is contained in:
parent
7fce226d47
commit
861af0f170
2 changed files with 18 additions and 44 deletions
|
@ -10,8 +10,8 @@ from pstats import Stats
|
||||||
from StringIO import StringIO
|
from StringIO import StringIO
|
||||||
|
|
||||||
from .get_metadata import GetMetadataTask, reset_collections
|
from .get_metadata import GetMetadataTask, reset_collections
|
||||||
from .process_metadata import InitNewSection, UpdateLastSync, ProcessMetadata, \
|
from .process_metadata import InitNewSection, UpdateLastSyncAndPlaystate, \
|
||||||
DeleteItem, UpdateUserdata
|
ProcessMetadata, DeleteItem
|
||||||
from . import common, sections
|
from . import common, sections
|
||||||
from .. import utils, timing, backgroundthread, variables as v, app
|
from .. import utils, timing, backgroundthread, variables as v, app
|
||||||
from .. import plex_functions as PF, itemtypes
|
from .. import plex_functions as PF, itemtypes
|
||||||
|
@ -60,7 +60,7 @@ class FullSync(common.libsync_mixin):
|
||||||
xml_item.get('addedAt', 1541572987)))):
|
xml_item.get('addedAt', 1541572987)))):
|
||||||
# Already got EXACTLY this item in our DB. BUT need to collect all
|
# Already got EXACTLY this item in our DB. BUT need to collect all
|
||||||
# DB updates within the same thread
|
# DB updates within the same thread
|
||||||
self.queue.put(UpdateLastSync(plex_id))
|
self.queue.put(UpdateLastSyncAndPlaystate(plex_id, xml_item))
|
||||||
return
|
return
|
||||||
task = GetMetadataTask()
|
task = GetMetadataTask()
|
||||||
task.setup(self.queue, plex_id, self.plex_type, self.get_children)
|
task.setup(self.queue, plex_id, self.plex_type, self.get_children)
|
||||||
|
@ -77,17 +77,6 @@ class FullSync(common.libsync_mixin):
|
||||||
return
|
return
|
||||||
self.queue.put(DeleteItem(plex_id))
|
self.queue.put(DeleteItem(plex_id))
|
||||||
|
|
||||||
@utils.log_time
|
|
||||||
def process_playstate(self, iterator):
|
|
||||||
"""
|
|
||||||
Updates the playstate (resume point, number of views, userrating, last
|
|
||||||
played date, etc.) for all elements in the (xml-)iterator
|
|
||||||
"""
|
|
||||||
for xml_item in iterator:
|
|
||||||
if self.isCanceled():
|
|
||||||
return False
|
|
||||||
self.queue.put(UpdateUserdata(xml_item))
|
|
||||||
|
|
||||||
@utils.log_time
|
@utils.log_time
|
||||||
def process_section(self, section):
|
def process_section(self, section):
|
||||||
LOG.debug('Processing library section %s', section)
|
LOG.debug('Processing library section %s', section)
|
||||||
|
@ -97,7 +86,7 @@ class FullSync(common.libsync_mixin):
|
||||||
app.SYNC.path_verified = False
|
app.SYNC.path_verified = False
|
||||||
try:
|
try:
|
||||||
# Sync new, updated and deleted items
|
# Sync new, updated and deleted items
|
||||||
iterator = section['iterator_1']
|
iterator = section['iterator']
|
||||||
# Tell the processing thread about this new section
|
# Tell the processing thread about this new section
|
||||||
queue_info = InitNewSection(section['context'],
|
queue_info = InitNewSection(section['context'],
|
||||||
iterator.total,
|
iterator.total,
|
||||||
|
@ -116,12 +105,8 @@ class FullSync(common.libsync_mixin):
|
||||||
LOG.debug('Waiting for download threads to finish')
|
LOG.debug('Waiting for download threads to finish')
|
||||||
while self.threader.threader.working():
|
while self.threader.threader.working():
|
||||||
app.APP.monitor.waitForAbort(0.1)
|
app.APP.monitor.waitForAbort(0.1)
|
||||||
LOG.debug('Waiting for processing thread to finish section')
|
|
||||||
self.queue.join()
|
|
||||||
reset_collections()
|
reset_collections()
|
||||||
try:
|
try:
|
||||||
# Sync playstate of every item
|
|
||||||
iterator = section['iterator_2']
|
|
||||||
# Tell the processing thread that we're syncing playstate
|
# Tell the processing thread that we're syncing playstate
|
||||||
queue_info = InitNewSection(section['context'],
|
queue_info = InitNewSection(section['context'],
|
||||||
iterator.total,
|
iterator.total,
|
||||||
|
@ -129,10 +114,12 @@ class FullSync(common.libsync_mixin):
|
||||||
section['section_id'],
|
section['section_id'],
|
||||||
section['plex_type'])
|
section['plex_type'])
|
||||||
self.queue.put(queue_info)
|
self.queue.put(queue_info)
|
||||||
|
LOG.debug('Waiting for processing thread to finish section')
|
||||||
|
# Make sure that the processing thread commits all changes
|
||||||
|
self.queue.join()
|
||||||
with PlexDB() as self.plexdb:
|
with PlexDB() as self.plexdb:
|
||||||
if section['plex_type'] != v.PLEX_TYPE_ARTIST:
|
|
||||||
self.process_playstate(iterator)
|
|
||||||
# Delete movies that are not on Plex anymore
|
# Delete movies that are not on Plex anymore
|
||||||
|
LOG.debug('Look for items to delete')
|
||||||
self.process_delete()
|
self.process_delete()
|
||||||
# Wait again till the processing thread is done
|
# Wait again till the processing thread is done
|
||||||
self.queue.join()
|
self.queue.join()
|
||||||
|
@ -158,9 +145,7 @@ class FullSync(common.libsync_mixin):
|
||||||
element['element_type'] = kind[1]
|
element['element_type'] = kind[1]
|
||||||
element['context'] = kind[2]
|
element['context'] = kind[2]
|
||||||
element['get_children'] = kind[3]
|
element['get_children'] = kind[3]
|
||||||
element['iterator_1'] = PF.SectionItems(section['section_id'],
|
element['iterator'] = PF.SectionItems(section['section_id'],
|
||||||
plex_type=kind[0])
|
|
||||||
element['iterator_2'] = PF.SectionItems(section['section_id'],
|
|
||||||
plex_type=kind[0])
|
plex_type=kind[0])
|
||||||
queue.put(element)
|
queue.put(element)
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -17,8 +17,6 @@ class InitNewSection(object):
|
||||||
"""
|
"""
|
||||||
Throw this into the queue used for ProcessMetadata to tell it which
|
Throw this into the queue used for ProcessMetadata to tell it which
|
||||||
Plex library section we're looking at
|
Plex library section we're looking at
|
||||||
|
|
||||||
context: itemtypes.Movie, itemtypes.Episode, etc.
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, context, total_number_of_items, section_name,
|
def __init__(self, context, total_number_of_items, section_name,
|
||||||
section_id, plex_type):
|
section_id, plex_type):
|
||||||
|
@ -29,14 +27,10 @@ class InitNewSection(object):
|
||||||
self.plex_type = plex_type
|
self.plex_type = plex_type
|
||||||
|
|
||||||
|
|
||||||
class UpdateUserdata(object):
|
class UpdateLastSyncAndPlaystate(object):
|
||||||
def __init__(self, xml_item):
|
def __init__(self, plex_id, xml_item):
|
||||||
self.xml_item = xml_item
|
|
||||||
|
|
||||||
|
|
||||||
class UpdateLastSync(object):
|
|
||||||
def __init__(self, plex_id):
|
|
||||||
self.plex_id = plex_id
|
self.plex_id = plex_id
|
||||||
|
self.xml_item = xml_item
|
||||||
|
|
||||||
|
|
||||||
class DeleteItem(object):
|
class DeleteItem(object):
|
||||||
|
@ -49,12 +43,6 @@ class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread):
|
||||||
Not yet implemented for more than 1 thread - if ever. Only to be called by
|
Not yet implemented for more than 1 thread - if ever. Only to be called by
|
||||||
ONE thread!
|
ONE thread!
|
||||||
Processes the XML metadata in the queue
|
Processes the XML metadata in the queue
|
||||||
|
|
||||||
Input:
|
|
||||||
queue: Queue.Queue() object that you'll need to fill up with
|
|
||||||
the downloaded XML eTree objects
|
|
||||||
item_class: as used to call functions in itemtypes.py e.g. 'Movies' =>
|
|
||||||
itemtypes.Movies()
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, queue, last_sync, show_dialog):
|
def __init__(self, queue, last_sync, show_dialog):
|
||||||
self._canceled = False
|
self._canceled = False
|
||||||
|
@ -131,15 +119,15 @@ class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread):
|
||||||
children=item['children'])
|
children=item['children'])
|
||||||
self.title = item['xml'][0].get('title')
|
self.title = item['xml'][0].get('title')
|
||||||
self.processed += 1
|
self.processed += 1
|
||||||
elif isinstance(item, UpdateLastSync):
|
elif isinstance(item, UpdateLastSyncAndPlaystate):
|
||||||
context.plexdb.update_last_sync(item.plex_id,
|
context.plexdb.update_last_sync(item.plex_id,
|
||||||
section.plex_type,
|
section.plex_type,
|
||||||
self.last_sync)
|
self.last_sync)
|
||||||
elif isinstance(item, UpdateUserdata):
|
if section.plex_type != v.PLEX_TYPE_ARTIST:
|
||||||
context.update_userdata(item.xml_item, section.plex_type)
|
context.update_userdata(item.xml_item,
|
||||||
|
section.plex_type)
|
||||||
elif isinstance(item, InitNewSection) or item is None:
|
elif isinstance(item, InitNewSection) or item is None:
|
||||||
section = item
|
section = item
|
||||||
self.queue.task_done()
|
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
context.remove(item.plex_id, plex_type=section.plex_type)
|
context.remove(item.plex_id, plex_type=section.plex_type)
|
||||||
|
@ -149,6 +137,7 @@ class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread):
|
||||||
self.processed = 0
|
self.processed = 0
|
||||||
context.commit()
|
context.commit()
|
||||||
self.queue.task_done()
|
self.queue.task_done()
|
||||||
|
self.queue.task_done()
|
||||||
profile.disable()
|
profile.disable()
|
||||||
string_io = StringIO()
|
string_io = StringIO()
|
||||||
stats = Stats(profile, stream=string_io).sort_stats('cumulative')
|
stats = Stats(profile, stream=string_io).sort_stats('cumulative')
|
||||||
|
|
Loading…
Reference in a new issue