From 7a79db3a830e19598329bd7546c9c6c7fc53242d Mon Sep 17 00:00:00 2001 From: Christopher Tomkins-Tinch Date: Tue, 10 Dec 2024 19:27:54 -0500 Subject: [PATCH] add workflow "unpack_archive_to_bucket"; add task "unpack_archive_to_bucket_path" This adds a new WDL task, "unpack_archive_to_bucket_path", to unpack an input tarball to a specified bucket path, with options to set the target destination, layers of wrapping directories to remove (from around the tarball contents), and whether the files extracted should be uploaded directly via pipe or from a temporary location on-disk. This task is intended to run on Terra, but it can be run elsewhere if a Google Cloud auth token is passed in to the task. A corresponding workflow, unpack_archive_to_bucket, is also added. --- .dockstore.yml | 39 ++-- pipes/WDL/tasks/tasks_utils.wdl | 169 +++++++++++++++++- .../workflows/unpack_archive_to_bucket.wdl | 24 +++ 3 files changed, 210 insertions(+), 22 deletions(-) create mode 100644 pipes/WDL/workflows/unpack_archive_to_bucket.wdl diff --git a/.dockstore.yml b/.dockstore.yml index 0c9012485..f7f5e78a1 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -65,6 +65,16 @@ workflows: primaryDescriptorPath: /pipes/WDL/workflows/beast_gpu.wdl testParameterFiles: - /empty.json + - name: blastoff + subclass: WDL + primaryDescriptorPath: /pipes/WDL/workflows/blastoff.wdl + testParameterFiles: + - /empty.json + - name: chunk_blast + subclass: WDL + primaryDescriptorPath: /pipes/WDL/workflows/megablast_chunk.wdl + testParameterFiles: + - /empty.json - name: classify_kaiju subclass: WDL primaryDescriptorPath: /pipes/WDL/workflows/classify_kaiju.wdl @@ -100,6 +110,16 @@ workflows: primaryDescriptorPath: /pipes/WDL/workflows/coverage_table.wdl testParameterFiles: - /empty.json + - name: create_enterics_qc_viz + subclass: WDL + primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl + testParameterFiles: + - /empty.json + - name: create_enterics_qc_viz_general + subclass: WDL + primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl + testParameterFiles: + - /empty.json - name: demux_metag subclass: WDL primaryDescriptorPath: /pipes/WDL/workflows/demux_metag.wdl @@ -404,23 +424,8 @@ workflows: primaryDescriptorPath: /pipes/WDL/workflows/bam_to_qiime.wdl testParameterFiles: - /empty.json - - name: create_enterics_qc_viz - subclass: WDL - primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl - testParameterFiles: - - /empty.json - - name: create_enterics_qc_viz_general - subclass: WDL - primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl - testParameterFiles: - - /empty.json - - name: blastoff + - name: unpack_archive_to_bucket subclass: WDL - primaryDescriptorPath: /pipes/WDL/workflows/blastoff.wdl - testParameterFiles: - - /empty.json - - name: chunk_blast - subclass: WDL - primaryDescriptorPath: /pipes/WDL/workflows/megablast_chunk.wdl + primaryDescriptorPath: /pipes/WDL/workflows/unpack_archive_to_bucket.wdl testParameterFiles: - /empty.json diff --git a/pipes/WDL/tasks/tasks_utils.wdl b/pipes/WDL/tasks/tasks_utils.wdl index ac9d3553a..7d7080ccf 100644 --- a/pipes/WDL/tasks/tasks_utils.wdl +++ b/pipes/WDL/tasks/tasks_utils.wdl @@ -27,6 +27,165 @@ task concatenate { } } +task unpack_archive_to_bucket_path { + meta { + description: "Unpack archive(s) to a target location within a Google Storage bucket" + } + input { + # input archive(s) + Array[File] input_archive_files + + # destination for extracted files + String bucket_path_prefix + String? out_dir_name + + # gcloud storage options + Boolean clobber_existing = false + String? gcloud_access_token + + # tar options + Boolean bypass_disk_and_unpack_directly_to_bucket = false + Int? archive_wrapper_directories_to_strip + String tar_opts = "-v --ignore-zeros --no-ignore-command-error" + + # resource requirements + Int disk_size = 500 + Int machine_mem_gb = 128 + String docker = "quay.io/broadinstitute/viral-core:2.4.0" + } + + parameter_meta { + input_archive_files: { + description: "List of input archive files to unpack.", + patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"] + } + bucket_path_prefix: { + description: "The path prefix to the Google Storage bucket location where the archive contents will be unpacked. This must begin with the bucket name, should start with 'gs://', and can include as many sub-directories as desired." + } + out_dir_name: { + description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix." + } + gcloud_access_token: { + description: "Access token for the Google Cloud Storage bucket, if needed to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be used (service/pet account on Terra)." + } + archive_wrapper_directories_to_strip: { + description: "If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar." + } + clobber_existing: { + description: "If true, overwrite files in the target directory of the bucket if they already exist." + } + bypass_disk_and_unpack_directly_to_bucket: { + description: "If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation." + } + tar_opts: { + description: "Options to pass to the tar command during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'" + } + disk_size: { + description: "Size of the disk to allocate for the task, in GB. Note that if multiple files are provided to 'input_archive_files', and extracted data is written to the disk (bypass_disk_and_unpack_directly_to_bucket=false), the extracted data from one archive will be removed before extracting and uploading data from the next input archive." + } + machine_mem_gb: { + description: "Memory to allocate for the task, in GB." + } + } + + command <<< + if ~{if(defined(gcloud_access_token)) then 'true' else 'false'}; then + # set access token env var expected by gcloud, + # if provided by the user + export CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}" + fi + + # check whether the bucket path prefix begins with "gs://" and if not, + # prepend the 'protocol'; also strip leading or trailing slash if present + # (for flexibility—the user can specify the bucket path prefix with or without the protocol) + bucket_path_prefix=$(echo "~{bucket_path_prefix}" | sed -e 's|^gs://||' -e 's|/$||' -e 's|^/*||' -e 's|^|gs://|') + + # check that, excluding the gs:// 'protocol' prefix, the bucket path prefix is not empty + if [ -z "${bucket_path_prefix/#gs:\/\//}" ]; then + echo "ERROR: bucket path prefix is empty" >&2 + exit 1 + fi + + # check whether the user can write to the target bucket + # by trying a simple write action, since we cannot rely on + # the user having the permissions needed to view the IAM policie(ss) that + # determine their (write) access to the bucket + if ! echo "write_test" | gcloud storage cp - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then + echo "ERROR: user does not have write access to the target bucket: ~{bucket_path_prefix}" >&2 + exit 1 + else + # clean up the test file if the write test was successful + gcloud storage rm "${bucket_path_prefix}/.tmp/test-write-access.txt" + fi + + # for each of the input archives provided, extract the contents to the target bucket + # either directly via pipe, or from an intermediate location on disk + for input_archive in ~{sep=' ' input_archive_files}; do + echo "Processing archive: $(basename "${input_archive}")" + + # if the user has requested to bypass writing to disk between extraction and upload + if ~{if(bypass_disk_and_unpack_directly_to_bucket) then 'true' else 'false'}; then + echo "Unpacking archive(s) and piping directly to gcloud storage upload processes (bypassing the disk)..." + + # TODO: parallelize if needed and if the increased memory usage is acceptable + # either via GNU parallel ( https://www.gnu.org/software/parallel/parallel_examples.html ) + # or by simply pushing the tar processes to the background + + # pipe each file to a command via stdout, relying GNU tar to pass file information + # out of band via special environment variables set for each file when using the --to-command + # + # documentation here: + # https://www.gnu.org/software/tar/manual/html_section/extract-options.html#Writing-to-an-External-Program + tar ~{tar_opts} -x \ + ~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \ + --to-command='gcloud storage cp ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \ + -f "${input_archive}" + + # otherwise extract to disk and then upload to the bucket + else + echo 'Extracting archive '$(basename "${input_archive}")' to disk before upload...' + + # create a temporary directory to extract the archive contents to + mkdir -p extracted_tmp + + # extract the archive to the temporary directory + tar ~{tar_opts} -x \ + --directory "./extracted_tmp" \ + ~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \ + -f "${input_archive}" + + pushd extracted_tmp + + echo "Uploading extracted files to the target bucket..." + + # gcloud storage rsync the extracted files to the target bucket in the target directory + gcloud storage rsync \ + --recursive \ + ~{if clobber_existing then "" else "--no-clobber"} \ + --verbosity warning \ + ./ "${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}" + + popd + rm -r ./extracted_tmp + fi + done + >>> + + runtime { + docker: docker + memory: machine_mem_gb + " GB" + cpu: 16 + disks: "local-disk " + disk_size + " LOCAL" + disk: disk_size + " GB" # TES + dx_instance_type: "mem3_ssd1_v2_x16" + preemptible: 0 + maxRetries: 1 + } + + output { + } +} + task zcat { meta { description: "Glue together a bunch of text files that may or may not be compressed (autodetect among gz,xz,bz2,lz4,zst or uncompressed inputs). Optionally compress the output (depending on requested file extension)" @@ -256,7 +415,7 @@ task download_from_url { # ---- # get the name of the downloaded file - downloaded_file_name="$(basename $(ls -1 | head -n1))" + downloaded_file_name="$(basename "$(ls -1 | head -n1)")" if [ ! -f "$downloaded_file_name" ]; then echo "Could not locate downloaded file \"$downloaded_file_name\"" @@ -274,12 +433,12 @@ task download_from_url { # since wget stores both in a single file separated by a couple newlines if [[ "~{url_to_download}" =~ ^https?:// ]] && ~{if save_response_header_to_file then "true" else "false"}; then echo "Saving response headers separately..." - csplit -f response -s tmp/${downloaded_file_name} $'/^\r$/+1' && \ - mv response00 ../${downloaded_file_name}.headers && \ - mv response01 ${downloaded_file_name} && \ + csplit -f response -s "tmp/${downloaded_file_name}" $'/^\r$/+1' && \ + mv response00 "../${downloaded_file_name}.headers" && \ + mv response01 "${downloaded_file_name}" && \ rm "tmp/$downloaded_file_name" else - mv tmp/${downloaded_file_name} ${downloaded_file_name} + mv "tmp/${downloaded_file_name} ${downloaded_file_name}" fi # alternative python implementation to split response headers from body # via https://stackoverflow.com/a/75483099 diff --git a/pipes/WDL/workflows/unpack_archive_to_bucket.wdl b/pipes/WDL/workflows/unpack_archive_to_bucket.wdl new file mode 100644 index 000000000..2a2b19a9a --- /dev/null +++ b/pipes/WDL/workflows/unpack_archive_to_bucket.wdl @@ -0,0 +1,24 @@ +version 1.0 + +import "../tasks/tasks_utils.wdl" as tasks_utils +import "../tasks/tasks_terra.wdl" as tasks_terra + +workflow unpack_archive_to_bucket { + meta { + description: "Unpack archive(s) to a target location within a Google Storage bucket" + author: "Broad Viral Genomics" + email: "viral-ngs@broadinstitute.org" + + allowNestedInputs: true + } + + input { + String? gcloud_auth_token + } + + call tasks_terra.check_terra_env + + if( (check_terra_env.is_running_on_terra && check_terra_env.is_backed_by_gcp) || defined(gcloud_auth_token) ) { + call tasks_utils.unpack_archive_to_bucket_path + } +}