Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist Globus file size #3256

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions app/jobs/fetch_globus_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,41 +9,43 @@ class FetchGlobusJob < BaseDepositJob

def perform(work_version)
work_version.attached_files.destroy_all
filepaths = filepaths_for(work_version)
files = files_for(work_version)

# Since it can take a while (hours) to get the filepaths from Globus API for large
# deposits we need to ensure that we still have an active database connection
# before trying to use it again or else we can get an error:
# PG::UnableToSend: SSL SYSCALL error: EOF detected
ActiveRecord::Base.clear_active_connections!

filepaths.each do |path|
next if ignore?(path)
files.each do |file|
next if ignore?(file.name)

work_version.attached_files << new_attached_file(path, work_version)
work_version.attached_files << new_attached_file(file, work_version)
end
work_version.upload_type = "browser"
work_version.fetch_globus_complete!
end

def filepaths_for(work_version)
def files_for(work_version)
GlobusClient
.get_filenames(path: work_version.globus_endpoint, user_id: work_version.work.owner.email)
.map { |filepath| filepath.delete_prefix(work_version.globus_endpoint_fullpath) }
.list_files(path: work_version.globus_endpoint, user_id: work_version.work.owner.email)
.map do |file|
file.tap { file.name = file.name.delete_prefix(work_version.globus_endpoint_fullpath) }
end
end

def ignore?(path)
path.start_with?("__MACOSX") || path.end_with?(".DS_Store")
end

def new_attached_file(path, work_version)
AttachedFile.new(path:, work_version:).tap do |attached_file|
def new_attached_file(file, work_version)
AttachedFile.new(path: file.name, work_version:).tap do |attached_file|
blob = ActiveStorage::Blob.create_before_direct_upload!(
key: attached_file.create_globus_active_storage_key,
filename: path,
filename: file.name,
service_name: ActiveStorage::Service::GlobusService::SERVICE_NAME,
byte_size: 0,
checksum: path
byte_size: file.size,
checksum: file.name
)
attached_file.file.attach(blob)
end
Expand Down
8 changes: 8 additions & 0 deletions app/services/active_storage/service/globus_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ def download(key, &)
raise NotImplementedError
end

def download_chunk(key, range)
# This is called by ActiveStorage::Blob::Identifiable when an
# ActiveStorage::Blob is being attached to an AttachedFile to identify the
# content type of the file. Since we don't have access to the content
# here we don't return anything. If we didn't have this here we would get a
# NotImplementedError exception.
end

def delete(key)
# This is called by ActiveSupport when #destroy is called on AttachedFile due to our use
# of ActiveStorage for file storage. This can happen during a decommission of a work.
Expand Down
60 changes: 60 additions & 0 deletions lib/tasks/cleanup.rake
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "logger"

namespace :cleanup do
desc "Remove unattached files"
task uploads: :environment do
Expand All @@ -8,4 +10,62 @@ namespace :cleanup do
.where("DATE(active_storage_blobs.created_at) = ?", 7.days.ago.to_date)
.find_each(&:purge_later)
end

desc "Update zero length files"
task file_sizes: :environment do
logger = Logger.new($stdout)

# Find druids that have zero length files in active storage and update them
# using the file size from SDR.
sql =
<<-SQL
SELECT
druid,
work_versions.id AS work_version_id,
active_storage_blobs.filename AS filename,
active_storage_blobs.id AS blob_id
FROM works
JOIN work_versions ON work_versions.work_id = works.id
JOIN attached_files ON attached_files.work_version_id = work_versions.id
JOIN active_storage_attachments ON active_storage_attachments.record_id = attached_files.id
JOIN active_storage_blobs ON active_storage_blobs.id = active_storage_attachments.blob_id
WHERE active_storage_blobs.byte_size = 0
AND druid IS NOT NULL;
SQL

# update the blob with the filesize from the SDR
objects = {}
ActiveRecord::Base.connection.execute(sql).each do |result|
# look up the druid if we haven't seen it already
if !objects.has_key?(result["druid"])
begin
objects[result["druid"]] = Repository.find(result["druid"])
rescue RuntimeError
logger.error("Unable to lookup %{result['druid']} in SDR")
next
end
end
object = objects[result["druid"]]

# find the file in the structural metadata
sdr_file = nil
object.structural.contains.each do |fileset|
sdr_file ||= fileset.structural.contains.find { |file| file.filename == result["filename"] }
end

# update the blob!
if sdr_file
blob = ActiveStorage::Blob.find(result["blob_id"])
if blob.byte_size == 0
blob.byte_size = sdr_file.size
blob.save
logger.info(%(updated blob #{blob.id} size to #{sdr_file.size} for #{result["druid"]}))
else
logger.error(%(blob #{blob.id} for #{result["druid"]} doesn't have zero byte size!))
end
else
logger.error(%(couldn't find #{result["filename"]} for #{result["druid"]}))
end
end
end
end
19 changes: 11 additions & 8 deletions spec/jobs/fetch_globus_job_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@

let(:work) { build(:work) }

let(:file_info) { GlobusClient::Endpoint::FileInfo }

before do
allow(GlobusClient).to receive(:get_filenames).and_return(
allow(GlobusClient).to receive(:list_files).and_return(
[
"/uploads/jstanford/work333/version1/file1.txt",
"/uploads/jstanford/work333/version1/__MACOSX/._file1.txt",
"/uploads/jstanford/work333/version1/dir1/file2.txt",
"/uploads/jstanford/work333/version1/__MACOSX/dir1/._file2.txt",
"/uploads/jstanford/work333/version1/dir2/.DS_Store",
"/uploads/jstanford/work333/version1/__MACOSX/dir2/._.DS_Store"
file_info.new("/uploads/jstanford/work333/version1/file1.txt", 24601),
file_info.new("/uploads/jstanford/work333/version1/__MACOSX/._file1.txt", 1),
file_info.new("/uploads/jstanford/work333/version1/dir1/file2.txt", 1),
file_info.new("/uploads/jstanford/work333/version1/__MACOSX/dir1/._file2.txt", 1),
file_info.new("/uploads/jstanford/work333/version1/dir2/.DS_Store", 1),
file_info.new("/uploads/jstanford/work333/version1/__MACOSX/dir2/._.DS_Store", 1)
]
)
work.update!(head: first_work_version)
Expand All @@ -36,9 +38,10 @@
expect(AttachedFile.find_by(id: attached_file.id)).to be_nil
attached_file = first_work_version.reload.attached_files.first
expect(attached_file.path).to eq("file1.txt")
expect(attached_file.byte_size).to eq(24601)
expect(attached_file.blob.service_name).to eq("globus")
expect(attached_file.blob.key).to eq("#{first_work_version.work.id}/1/file1.txt")
expect(GlobusClient).to have_received(:get_filenames).with(path: "jstanford/work333/version1",
expect(GlobusClient).to have_received(:list_files).with(path: "jstanford/work333/version1",
user_id: work.owner.email)
end
end