Refactor code
This commit is contained in:
parent
b69070275f
commit
ddd356deda
1 changed files with 31 additions and 29 deletions
|
@ -35,6 +35,7 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
"""
|
"""
|
||||||
repair=True: force sync EVERY item
|
repair=True: force sync EVERY item
|
||||||
"""
|
"""
|
||||||
|
self.successful = True
|
||||||
self.repair = repair
|
self.repair = repair
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
# For progress dialog
|
# For progress dialog
|
||||||
|
@ -45,21 +46,9 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
self.dialog.create(utils.lang(39714))
|
self.dialog.create(utils.lang(39714))
|
||||||
else:
|
else:
|
||||||
self.dialog = None
|
self.dialog = None
|
||||||
|
|
||||||
self.section_queue = Queue.Queue()
|
|
||||||
self.get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
|
|
||||||
self.processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE)
|
|
||||||
self.current_time = timing.plex_now()
|
self.current_time = timing.plex_now()
|
||||||
self.last_section = sections.Section()
|
self.last_section = sections.Section()
|
||||||
|
|
||||||
self.successful = True
|
|
||||||
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
self.install_sync_done = utils.settings('SyncInstallRunDone') == 'true'
|
||||||
self.threads = [
|
|
||||||
GetMetadataThread(self.get_metadata_queue, self.processing_queue)
|
|
||||||
for _ in range(int(utils.settings('syncThreadNumber')))
|
|
||||||
]
|
|
||||||
for t in self.threads:
|
|
||||||
t.start()
|
|
||||||
super(FullSync, self).__init__()
|
super(FullSync, self).__init__()
|
||||||
|
|
||||||
def update_progressbar(self, section, title, current):
|
def update_progressbar(self, section, title, current):
|
||||||
|
@ -89,33 +78,42 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH)
|
path_ops.copyfile(v.DB_PLEX_PATH, v.DB_PLEX_COPY_PATH)
|
||||||
|
|
||||||
@utils.log_time
|
@utils.log_time
|
||||||
def processing_loop_new_and_changed_items(self):
|
def processing_loop_new_and_changed_items(self, section_queue,
|
||||||
|
processing_queue):
|
||||||
LOG.debug('Start working')
|
LOG.debug('Start working')
|
||||||
|
get_metadata_queue = Queue.Queue(maxsize=BACKLOG_QUEUE_SIZE)
|
||||||
scanner_thread = FillMetadataQueue(self.repair,
|
scanner_thread = FillMetadataQueue(self.repair,
|
||||||
self.section_queue,
|
section_queue,
|
||||||
self.get_metadata_queue)
|
get_metadata_queue)
|
||||||
scanner_thread.start()
|
scanner_thread.start()
|
||||||
|
metadata_threads = [
|
||||||
|
GetMetadataThread(get_metadata_queue, processing_queue)
|
||||||
|
for _ in range(int(utils.settings('syncThreadNumber')))
|
||||||
|
]
|
||||||
|
for t in metadata_threads:
|
||||||
|
t.start()
|
||||||
process_thread = ProcessMetadataThread(self.current_time,
|
process_thread = ProcessMetadataThread(self.current_time,
|
||||||
self.processing_queue,
|
processing_queue,
|
||||||
self.update_progressbar)
|
self.update_progressbar)
|
||||||
process_thread.start()
|
process_thread.start()
|
||||||
LOG.debug('Waiting for scanner thread to finish up')
|
LOG.debug('Waiting for scanner thread to finish up')
|
||||||
scanner_thread.join()
|
scanner_thread.join()
|
||||||
LOG.debug('Waiting for metadata download threads to finish up')
|
LOG.debug('Waiting for metadata download threads to finish up')
|
||||||
for t in self.threads:
|
for t in metadata_threads:
|
||||||
t.join()
|
t.join()
|
||||||
LOG.debug('Download metadata threads finished')
|
LOG.debug('Download metadata threads finished')
|
||||||
# Sentinel for the process_thread once we added everything else
|
# Sentinel for the process_thread once we added everything else
|
||||||
self.processing_queue.put_sentinel(sections.Section())
|
processing_queue.put_sentinel(sections.Section())
|
||||||
|
LOG.debug('Put sentinel into queue, waiting for processing thread')
|
||||||
process_thread.join()
|
process_thread.join()
|
||||||
self.successful = process_thread.successful
|
self.successful = process_thread.successful
|
||||||
LOG.debug('threads finished work. successful: %s', self.successful)
|
LOG.debug('threads finished work. successful: %s', self.successful)
|
||||||
|
|
||||||
@utils.log_time
|
@utils.log_time
|
||||||
def processing_loop_playstates(self):
|
def processing_loop_playstates(self, section_queue):
|
||||||
while not self.should_cancel():
|
while not self.should_cancel():
|
||||||
section = self.section_queue.get()
|
section = section_queue.get()
|
||||||
self.section_queue.task_done()
|
section_queue.task_done()
|
||||||
if section is None:
|
if section is None:
|
||||||
break
|
break
|
||||||
self.playstate_per_section(section)
|
self.playstate_per_section(section)
|
||||||
|
@ -142,7 +140,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
LOG.error('Could not entirely process section %s', section)
|
LOG.error('Could not entirely process section %s', section)
|
||||||
self.successful = False
|
self.successful = False
|
||||||
|
|
||||||
def threaded_get_generators(self, kinds, queue, all_items):
|
def threaded_get_generators(self, kinds, section_queue, processing_queue,
|
||||||
|
all_items):
|
||||||
"""
|
"""
|
||||||
Getting iterators is costly, so let's do it in a dedicated thread
|
Getting iterators is costly, so let's do it in a dedicated thread
|
||||||
"""
|
"""
|
||||||
|
@ -176,17 +175,19 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
else:
|
else:
|
||||||
section.number_of_items = section.iterator.total
|
section.number_of_items = section.iterator.total
|
||||||
if section.number_of_items > 0:
|
if section.number_of_items > 0:
|
||||||
self.processing_queue.add_section(section)
|
processing_queue.add_section(section)
|
||||||
queue.put(section)
|
section_queue.put(section)
|
||||||
LOG.debug('Put section in queue with %s items: %s',
|
LOG.debug('Put section in queue with %s items: %s',
|
||||||
section.number_of_items, section)
|
section.number_of_items, section)
|
||||||
except Exception:
|
except Exception:
|
||||||
utils.ERROR(notify=True)
|
utils.ERROR(notify=True)
|
||||||
finally:
|
finally:
|
||||||
queue.put(None)
|
section_queue.put(None)
|
||||||
LOG.debug('Exiting threaded_get_generators')
|
LOG.debug('Exiting threaded_get_generators')
|
||||||
|
|
||||||
def full_library_sync(self):
|
def full_library_sync(self):
|
||||||
|
section_queue = Queue.Queue()
|
||||||
|
processing_queue = bg.ProcessingQueue(maxsize=XML_QUEUE_SIZE)
|
||||||
kinds = [
|
kinds = [
|
||||||
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE),
|
(v.PLEX_TYPE_MOVIE, v.PLEX_TYPE_MOVIE),
|
||||||
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW),
|
(v.PLEX_TYPE_SHOW, v.PLEX_TYPE_SHOW),
|
||||||
|
@ -202,9 +203,10 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
# We need to enforce syncing e.g. show before season before episode
|
# We need to enforce syncing e.g. show before season before episode
|
||||||
bg.FunctionAsTask(self.threaded_get_generators,
|
bg.FunctionAsTask(self.threaded_get_generators,
|
||||||
None,
|
None,
|
||||||
kinds, self.section_queue, False).start()
|
kinds, section_queue, processing_queue, False).start()
|
||||||
# Do the heavy lifting
|
# Do the heavy lifting
|
||||||
self.processing_loop_new_and_changed_items()
|
self.processing_loop_new_and_changed_items(section_queue,
|
||||||
|
processing_queue)
|
||||||
common.update_kodi_library(video=True, music=True)
|
common.update_kodi_library(video=True, music=True)
|
||||||
if self.should_cancel() or not self.successful:
|
if self.should_cancel() or not self.successful:
|
||||||
return
|
return
|
||||||
|
@ -236,8 +238,8 @@ class FullSync(common.LibrarySyncMixin, bg.KillableThread):
|
||||||
self.dialog = None
|
self.dialog = None
|
||||||
bg.FunctionAsTask(self.threaded_get_generators,
|
bg.FunctionAsTask(self.threaded_get_generators,
|
||||||
None,
|
None,
|
||||||
kinds, self.section_queue, True).start()
|
kinds, section_queue, processing_queue, True).start()
|
||||||
self.processing_loop_playstates()
|
self.processing_loop_playstates(section_queue)
|
||||||
if self.should_cancel() or not self.successful:
|
if self.should_cancel() or not self.successful:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue