From 7922cdef39caca037035b854b00a28f92dd64a27 Mon Sep 17 00:00:00 2001 From: Kirk Wang Date: Thu, 22 Feb 2024 13:58:45 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=81=20Reschedule=20ImporterJob=20if=20?= =?UTF-8?q?downloads=20aren't=20done?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit will add a check in the `ImporterJob` to see if the cloud files finished downloading. If they haven't, the job will be rescheduled until they are. --- .../bulkrax/importers_controller.rb | 3 ++- app/jobs/bulkrax/importer_job.rb | 20 +++++++++++++++++-- app/parsers/bulkrax/bagit_parser.rb | 4 ++-- app/parsers/bulkrax/csv_parser.rb | 5 ++++- 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/app/controllers/bulkrax/importers_controller.rb b/app/controllers/bulkrax/importers_controller.rb index 7fcbeff0..284caafe 100644 --- a/app/controllers/bulkrax/importers_controller.rb +++ b/app/controllers/bulkrax/importers_controller.rb @@ -214,10 +214,11 @@ def files_for_import(file, cloud_files) return if file.blank? && cloud_files.blank? @importer[:parser_fields]['import_file_path'] = @importer.parser.write_import_file(file) if file.present? if cloud_files.present? + @importer[:parser_fields]['cloud_file_paths'] = cloud_files # For BagIt, there will only be one bag, so we get the file_path back and set import_file_path # For CSV, we expect only file uploads, so we won't get the file_path back # and we expect the import_file_path to be set already - target = @importer.parser.retrieve_cloud_files(cloud_files) + target = @importer.parser.retrieve_cloud_files(cloud_files, @importer) @importer[:parser_fields]['import_file_path'] = target if target.present? end @importer.save diff --git a/app/jobs/bulkrax/importer_job.rb b/app/jobs/bulkrax/importer_job.rb index 9fb0f445..48e4ae8e 100644 --- a/app/jobs/bulkrax/importer_job.rb +++ b/app/jobs/bulkrax/importer_job.rb @@ -6,6 +6,7 @@ class ImporterJob < ApplicationJob def perform(importer_id, only_updates_since_last_import = false) importer = Importer.find(importer_id) + return schedule(importer, Time.zone.now + 3.minutes, 'Rescheduling: cloud files are not ready yet') unless all_files_completed?(importer) importer.current_run unzip_imported_file(importer.parser) @@ -16,6 +17,8 @@ def perform(importer_id, only_updates_since_last_import = false) importer.set_status_info(e) end + private + def import(importer, only_updates_since_last_import) importer.only_updates = only_updates_since_last_import || false return unless importer.valid_import? @@ -36,8 +39,21 @@ def update_current_run_counters(importer) importer.current_run.save! end - def schedule(importer) - ImporterJob.set(wait_until: importer.next_import_at).perform_later(importer.id, true) + def schedule(importer, wait_until = importer.next_import_at, message = nil) + Rails.logger.info message if message + ImporterJob.set(wait_until: wait_until).perform_later(importer.id, true) + end + + # checks the file sizes of the download files to match the original files + def all_files_completed?(importer) + cloud_files = importer.parser_fields['cloud_file_paths'] + original_files = importer.parser_fields['original_file_paths'] + return true unless cloud_files.present? && original_files.present? + + imported_file_sizes = cloud_files.map { |_, v| v['file_size'].to_i } + original_file_sizes = original_files.map { |imported_file| File.size(imported_file) } + + original_file_sizes == imported_file_sizes end end end diff --git a/app/parsers/bulkrax/bagit_parser.rb b/app/parsers/bulkrax/bagit_parser.rb index 8d93a1b5..a937f276 100644 --- a/app/parsers/bulkrax/bagit_parser.rb +++ b/app/parsers/bulkrax/bagit_parser.rb @@ -166,12 +166,12 @@ def write_triples(folder_count, e) # @todo - investigate getting directory structure # @todo - investigate using perform_later, and having the importer check for # DownloadCloudFileJob before it starts - def retrieve_cloud_files(files) + def retrieve_cloud_files(files, _importer) # There should only be one zip file for Bagit, take the first return if files['0'].blank? target_file = File.join(path_for_import, files['0']['file_name'].tr(' ', '_')) # Now because we want the files in place before the importer runs - Bulkrax::DownloadCloudFileJob.perform_now(files['0'], target_file) + Bulkrax::DownloadCloudFileJob.perform_later(files['0'], target_file) return target_file end diff --git a/app/parsers/bulkrax/csv_parser.rb b/app/parsers/bulkrax/csv_parser.rb index 653c15b9..ef93e5d8 100644 --- a/app/parsers/bulkrax/csv_parser.rb +++ b/app/parsers/bulkrax/csv_parser.rb @@ -190,9 +190,10 @@ def records_split_count # @todo - investigate getting directory structure # @todo - investigate using perform_later, and having the importer check for # DownloadCloudFileJob before it starts - def retrieve_cloud_files(files) + def retrieve_cloud_files(files, importer) files_path = File.join(path_for_import, 'files') FileUtils.mkdir_p(files_path) unless File.exist?(files_path) + target_files = [] files.each_pair do |_key, file| # fixes bug where auth headers do not get attached properly if file['auth_header'].present? @@ -201,10 +202,12 @@ def retrieve_cloud_files(files) end # this only works for uniquely named files target_file = File.join(files_path, file['file_name'].tr(' ', '_')) + target_files << target_file # Now because we want the files in place before the importer runs # Problematic for a large upload Bulkrax::DownloadCloudFileJob.perform_now(file, target_file) end + importer[:parser_fields]['original_file_paths'] = target_files return nil end