Revert "Attempt to fix items getting deleted on second sync"

This reverts commit 459bd72299.
This commit is contained in:
croneter 2018-12-01 18:43:47 +01:00
parent 459bd72299
commit 056463da55
4 changed files with 60 additions and 78 deletions

View file

@ -76,7 +76,6 @@ class ItemBase(object):
self.plexconn.close() self.plexconn.close()
self.kodiconn.close() self.kodiconn.close()
self.artconn.close() self.artconn.close()
LOG.debug('Committed and closed DBs')
return self return self
def commit(self): def commit(self):

View file

@ -11,7 +11,7 @@ from StringIO import StringIO
from .get_metadata import GetMetadataTask, reset_collections from .get_metadata import GetMetadataTask, reset_collections
from .process_metadata import InitNewSection, UpdateLastSync, ProcessMetadata, \ from .process_metadata import InitNewSection, UpdateLastSync, ProcessMetadata, \
DeleteItem, UpdateUserdata, StopSection DeleteItem, UpdateUserdata
from . import common, sections from . import common, sections
from .. import utils, timing, backgroundthread, variables as v, app from .. import utils, timing, backgroundthread, variables as v, app
from .. import plex_functions as PF, itemtypes from .. import plex_functions as PF, itemtypes
@ -38,7 +38,6 @@ class FullSync(common.libsync_mixin):
self.callback = callback self.callback = callback
self.show_dialog = show_dialog self.show_dialog = show_dialog
self.queue = None self.queue = None
self.out_queue = None
self.process_thread = None self.process_thread = None
self.current_sync = None self.current_sync = None
self.plexdb = None self.plexdb = None
@ -72,11 +71,10 @@ class FullSync(common.libsync_mixin):
Removes all the items that have NOT been updated (last_sync timestamp Removes all the items that have NOT been updated (last_sync timestamp
is different) is different)
""" """
for plex_id, last_sync in self.plexdb.plex_id_by_last_sync(self.plex_type, for plex_id in self.plexdb.plex_id_by_last_sync(self.plex_type,
self.current_sync): self.current_sync):
if self.isCanceled(): if self.isCanceled():
return return
LOG.debug('process_delete %s, %s', plex_id, last_sync)
self.queue.put(DeleteItem(plex_id)) self.queue.put(DeleteItem(plex_id))
@utils.log_time @utils.log_time
@ -114,6 +112,12 @@ class FullSync(common.libsync_mixin):
except RuntimeError: except RuntimeError:
LOG.error('Could not entirely process section %s', section) LOG.error('Could not entirely process section %s', section)
return False return False
LOG.debug('Waiting for download threads to finish')
while self.threader.threader.working():
app.APP.monitor.waitForAbort(0.1)
LOG.debug('Waiting for processing thread to finish section')
self.queue.join()
reset_collections()
try: try:
# Sync playstate of every item # Sync playstate of every item
iterator = section['iterator_2'] iterator = section['iterator_2']
@ -124,23 +128,9 @@ class FullSync(common.libsync_mixin):
section['section_id'], section['section_id'],
section['plex_type']) section['plex_type'])
self.queue.put(queue_info) self.queue.put(queue_info)
# We cannot safely use queue.join(), so use another queue
# This will block!
LOG.debug('Waiting for processing to finish process_item part')
self.out_queue.get()
self.out_queue.task_done()
LOG.debug('process_item part done')
reset_collections()
# Delete movies that are not on Plex anymore
self.process_delete()
self.out_queue.get()
self.out_queue.task_done()
if section['plex_type'] != v.PLEX_TYPE_ARTIST: if section['plex_type'] != v.PLEX_TYPE_ARTIST:
self.process_playstate(iterator) self.process_playstate(iterator)
self.queue.put(StopSection()) self.queue.join()
# This will block until processing thread has reached StopSection
self.out_queue.get()
self.out_queue.task_done()
except RuntimeError: except RuntimeError:
LOG.error('Could not process playstate for section %s', section) LOG.error('Could not process playstate for section %s', section)
return False return False
@ -206,6 +196,8 @@ class FullSync(common.libsync_mixin):
# Now do the heavy lifting # Now do the heavy lifting
if self.isCanceled() or not self.process_section(section): if self.isCanceled() or not self.process_section(section):
return False return False
# Delete movies that are not on Plex anymore
self.process_delete()
iterator_queue.task_done() iterator_queue.task_done()
return True return True
@ -226,9 +218,7 @@ class FullSync(common.libsync_mixin):
try: try:
# Fire up our single processing thread # Fire up our single processing thread
self.queue = backgroundthread.Queue.Queue(maxsize=1000) self.queue = backgroundthread.Queue.Queue(maxsize=1000)
self.out_queue = backgroundthread.Queue.Queue()
self.processing_thread = ProcessMetadata(self.queue, self.processing_thread = ProcessMetadata(self.queue,
self.out_queue,
self.current_sync, self.current_sync,
self.show_dialog) self.show_dialog)
self.processing_thread.start() self.processing_thread.start()

View file

@ -29,10 +29,6 @@ class InitNewSection(object):
self.plex_type = plex_type self.plex_type = plex_type
class StopSection(object):
pass
class UpdateUserdata(object): class UpdateUserdata(object):
def __init__(self, xml_item): def __init__(self, xml_item):
self.xml_item = xml_item self.xml_item = xml_item
@ -53,11 +49,16 @@ class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread):
Not yet implemented for more than 1 thread - if ever. Only to be called by Not yet implemented for more than 1 thread - if ever. Only to be called by
ONE thread! ONE thread!
Processes the XML metadata in the queue Processes the XML metadata in the queue
Input:
queue: Queue.Queue() object that you'll need to fill up with
the downloaded XML eTree objects
item_class: as used to call functions in itemtypes.py e.g. 'Movies' =>
itemtypes.Movies()
""" """
def __init__(self, queue, out_queue, last_sync, show_dialog): def __init__(self, queue, last_sync, show_dialog):
self._canceled = False self._canceled = False
self.queue = queue self.queue = queue
self.out_queue = out_queue
self.last_sync = last_sync self.last_sync = last_sync
self.show_dialog = show_dialog self.show_dialog = show_dialog
self.total = 0 self.total = 0
@ -104,58 +105,50 @@ class ProcessMetadata(common.libsync_mixin, backgroundthread.KillableThread):
# Init with the very first library section. This will block! # Init with the very first library section. This will block!
section = self.queue.get() section = self.queue.get()
self.queue.task_done() self.queue.task_done()
if section is None:
return
while not self.isCanceled(): while not self.isCanceled():
if section is None: if section is None:
break break
if isinstance(section, StopSection): LOG.debug('Start processing section %s (%ss)',
section = self.queue.get() section.name, section.plex_type)
continue self.current = 1
else: self.processed = 0
LOG.debug('Start processing section %s (%ss)', self.total = section.total
section.name, section.plex_type) self.section_name = section.name
self.current = 1 self.section_type_text = utils.lang(
self.processed = 0 v.TRANSLATION_FROM_PLEXTYPE[section.plex_type])
self.total = section.total profile = Profile()
self.section_name = section.name profile.enable()
self.section_type_text = utils.lang( with section.context(self.last_sync) as context:
v.TRANSLATION_FROM_PLEXTYPE[section.plex_type]) while not self.isCanceled():
profile = Profile() # grabs item from queue. This will block!
profile.enable() item = self.queue.get()
with section.context(self.last_sync) as context: if isinstance(item, dict):
while not self.isCanceled(): context.add_update(item['xml'][0],
# grabs item from queue. This will block! section_name=section.name,
item = self.queue.get() section_id=section.id,
if isinstance(item, dict): children=item['children'])
context.add_update(item['xml'][0], self.title = item['xml'][0].get('title')
section_name=section.name, self.processed += 1
section_id=section.id, elif isinstance(item, UpdateLastSync):
children=item['children']) context.plexdb.update_last_sync(item.plex_id,
self.title = item['xml'][0].get('title') section.plex_type,
self.processed += 1 self.last_sync)
elif isinstance(item, UpdateLastSync): elif isinstance(item, UpdateUserdata):
LOG.debug('UpdateLastSync to %s', self.last_sync) context.update_userdata(item.xml_item, section.plex_type)
context.plexdb.update_last_sync(item.plex_id, elif isinstance(item, InitNewSection) or item is None:
section.plex_type, section = item
self.last_sync)
LOG.debug('Item now: %s', context.plexdb.movie(item.plex_id))
elif isinstance(item, UpdateUserdata):
context.update_userdata(item.xml_item, section.plex_type)
elif (isinstance(item, InitNewSection) or
isinstance(item, StopSection) or
item is None):
section = item
self.queue.task_done()
break
else:
context.remove(item.plex_id, plex_type=section.plex_type)
self.update_progressbar()
self.current += 1
if self.processed == 500:
self.processed = 0
context.commit()
self.queue.task_done() self.queue.task_done()
# Tell the main sync thread that we're done with this section break
self.out_queue.put(None) else:
context.remove(item.plex_id, plex_type=section.plex_type)
self.update_progressbar()
self.current += 1
if self.processed == 500:
self.processed = 0
context.commit()
self.queue.task_done()
profile.disable() profile.disable()
string_io = StringIO() string_io = StringIO()
stats = Stats(profile, stream=string_io).sort_stats('cumulative') stats = Stats(profile, stream=string_io).sort_stats('cumulative')

View file

@ -93,8 +93,8 @@ class PlexDBBase(object):
""" """
Returns an iterator for all items where the last_sync is NOT identical Returns an iterator for all items where the last_sync is NOT identical
""" """
return (x for x in return (x[0] for x in
self.cursor.execute('SELECT plex_id, last_sync FROM %s WHERE last_sync <> ?' % plex_type, self.cursor.execute('SELECT plex_id FROM %s WHERE last_sync <> ?' % plex_type,
(last_sync, ))) (last_sync, )))
def checksum(self, plex_id, plex_type): def checksum(self, plex_id, plex_type):