Skip to content

Commit

Permalink
🎁 Reschedule ImporterJob if downloads aren't done
Browse files Browse the repository at this point in the history
This commit will add a check in the `ImporterJob` to see if the cloud
files finished downloading.  If they haven't, the job will be
rescheduled until they are.
  • Loading branch information
kirkkwang committed Feb 28, 2024
1 parent 4c68fbb commit 7922cde
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 6 deletions.
3 changes: 2 additions & 1 deletion app/controllers/bulkrax/importers_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ def files_for_import(file, cloud_files)
return if file.blank? && cloud_files.blank?
@importer[:parser_fields]['import_file_path'] = @importer.parser.write_import_file(file) if file.present?
if cloud_files.present?
@importer[:parser_fields]['cloud_file_paths'] = cloud_files
# For BagIt, there will only be one bag, so we get the file_path back and set import_file_path
# For CSV, we expect only file uploads, so we won't get the file_path back
# and we expect the import_file_path to be set already
target = @importer.parser.retrieve_cloud_files(cloud_files)
target = @importer.parser.retrieve_cloud_files(cloud_files, @importer)
@importer[:parser_fields]['import_file_path'] = target if target.present?
end
@importer.save
Expand Down
20 changes: 18 additions & 2 deletions app/jobs/bulkrax/importer_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class ImporterJob < ApplicationJob

def perform(importer_id, only_updates_since_last_import = false)
importer = Importer.find(importer_id)
return schedule(importer, Time.zone.now + 3.minutes, 'Rescheduling: cloud files are not ready yet') unless all_files_completed?(importer)

importer.current_run
unzip_imported_file(importer.parser)
Expand All @@ -16,6 +17,8 @@ def perform(importer_id, only_updates_since_last_import = false)
importer.set_status_info(e)
end

private

def import(importer, only_updates_since_last_import)
importer.only_updates = only_updates_since_last_import || false
return unless importer.valid_import?
Expand All @@ -36,8 +39,21 @@ def update_current_run_counters(importer)
importer.current_run.save!
end

def schedule(importer)
ImporterJob.set(wait_until: importer.next_import_at).perform_later(importer.id, true)
def schedule(importer, wait_until = importer.next_import_at, message = nil)
Rails.logger.info message if message
ImporterJob.set(wait_until: wait_until).perform_later(importer.id, true)
end

# checks the file sizes of the download files to match the original files
def all_files_completed?(importer)
cloud_files = importer.parser_fields['cloud_file_paths']
original_files = importer.parser_fields['original_file_paths']
return true unless cloud_files.present? && original_files.present?

imported_file_sizes = cloud_files.map { |_, v| v['file_size'].to_i }
original_file_sizes = original_files.map { |imported_file| File.size(imported_file) }

original_file_sizes == imported_file_sizes
end
end
end
4 changes: 2 additions & 2 deletions app/parsers/bulkrax/bagit_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@ def write_triples(folder_count, e)
# @todo - investigate getting directory structure
# @todo - investigate using perform_later, and having the importer check for
# DownloadCloudFileJob before it starts
def retrieve_cloud_files(files)
def retrieve_cloud_files(files, _importer)
# There should only be one zip file for Bagit, take the first
return if files['0'].blank?
target_file = File.join(path_for_import, files['0']['file_name'].tr(' ', '_'))
# Now because we want the files in place before the importer runs
Bulkrax::DownloadCloudFileJob.perform_now(files['0'], target_file)
Bulkrax::DownloadCloudFileJob.perform_later(files['0'], target_file)
return target_file
end

Expand Down
5 changes: 4 additions & 1 deletion app/parsers/bulkrax/csv_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,10 @@ def records_split_count
# @todo - investigate getting directory structure
# @todo - investigate using perform_later, and having the importer check for
# DownloadCloudFileJob before it starts
def retrieve_cloud_files(files)
def retrieve_cloud_files(files, importer)
files_path = File.join(path_for_import, 'files')
FileUtils.mkdir_p(files_path) unless File.exist?(files_path)
target_files = []
files.each_pair do |_key, file|
# fixes bug where auth headers do not get attached properly
if file['auth_header'].present?
Expand All @@ -201,10 +202,12 @@ def retrieve_cloud_files(files)
end
# this only works for uniquely named files
target_file = File.join(files_path, file['file_name'].tr(' ', '_'))
target_files << target_file
# Now because we want the files in place before the importer runs
# Problematic for a large upload
Bulkrax::DownloadCloudFileJob.perform_now(file, target_file)
end
importer[:parser_fields]['original_file_paths'] = target_files
return nil
end

Expand Down

0 comments on commit 7922cde

Please sign in to comment.