Skip to content

Commit

Permalink
updates to ImportGenomes and LoadBigQueryData (#7112)
Browse files Browse the repository at this point in the history
* revert input_vcfs to array[file], add this to sample inputs json

* add this branch to dockstore

* remove this branch from dockstore

* add LoadBigQueryData to dockstore, modify check for existing tables, load from github

* exit with error if bq load fails

* use relative path to import LoadBigQueryData.wdl

* refactor ImportGenomes to contain BQ table creation and loading

* remove for_testing_only

* docker -> docker_final

* last wdl fix please

* remove #done

* add back done - end of for loop

* remove LoadBigQueryData wdl

* ensure tsv creation before making bq tables

* run CreateTables concurrently, clean up old code, LoadTable not preemptible, rename numbered to superpartitioned

* pad table id to 3 digits

* fix padded table id

* fix padded logic again

* fix range for table_id

* remove unused import

* remove feature branch from dockstore.yml
  • Loading branch information
mmorgantaylor authored and kcibul committed Mar 9, 2021
1 parent 0fe1dd4 commit 04a9b44
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 227 deletions.
9 changes: 4 additions & 5 deletions scripts/variantstore/wdl/ImportGenomes.example.inputs.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@
"ImportGenomes.drop_state": "SIXTY",
"ImportGenomes.gatk_override": "gs://broad-dsp-spec-ops/kcibul/gatk-package-4.1.8.1-153-g9c3b338-SNAPSHOT-local.jar",

"ImportGenomes.input_vcfs": "this.samples.hg38_reblocked_gvcf",

"ImportGenomes.project_id": "spec-ops-aou",
"ImportGenomes.dataset_name": "kc_import_test1",

"ImportGenomes.sample_map": "gs://fc-50fc04ca-572b-4cba-82d5-4af10722cdc7/test_sample_map.csv",

"ImportGenomes.input_vcfs_list": "gs://fc-50fc04ca-572b-4cba-82d5-4af10722cdc7/input_vcfs.txt",

"ImportGenomes.output_directory": "gs://fc-50fc04ca-572b-4cba-82d5-4af10722cdc7/testrun2_importdir"
"ImportGenomes.output_directory": "gs://fc-50fc04ca-572b-4cba-82d5-4af10722cdc7/testrun2_importdir",
"ImportGenomes.sample_map": "gs://fc-50fc04ca-572b-4cba-82d5-4af10722cdc7/test_sample_map.csv"
}

231 changes: 195 additions & 36 deletions scripts/variantstore/wdl/ImportGenomes.wdl
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
version 1.0

import "https://api.firecloud.org/ga4gh/v1/tools/synthetic-microarray-gen:LoadBigQueryData/versions/7/plain-WDL/descriptor" as LoadBigQueryData

workflow ImportGenomes {

input {
File input_vcfs_list
Array[File]? input_metrics
Array[File] input_vcfs
File interval_list
String output_directory
File sample_map
Expand All @@ -24,23 +21,62 @@ workflow ImportGenomes {
}
String docker_final = select_first([docker, "us.gcr.io/broad-gatk/gatk:4.1.7.0"])
Array[String] input_vcfs = read_lines(input_vcfs_list)
call GetMaxTableId {
input:
sample_map = sample_map
}

scatter (i in range(length(input_vcfs))) {
if (defined(input_metrics)) {
File input_metric = select_first([input_metrics])[i]
}
call CreateTables as CreateMetadataTables {
input:
project_id = project_id,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "metadata",
max_table_id = GetMaxTableId.max_table_id,
schema = metadata_schema,
superpartitioned = "false",
partitioned = "false",
uuid = "",
preemptible_tries = preemptible_tries,
docker = docker_final
}
call CreateTables as CreatePetTables {
input:
project_id = project_id,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "pet",
max_table_id = GetMaxTableId.max_table_id,
schema = pet_schema,
superpartitioned = "true",
partitioned = "true",
uuid = "",
preemptible_tries = preemptible_tries,
docker = docker_final
}

call CreateTables as CreateVetTables {
input:
project_id = project_id,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "vet",
max_table_id = GetMaxTableId.max_table_id,
schema = vet_schema,
superpartitioned = "true",
partitioned = "true",
uuid = "",
preemptible_tries = preemptible_tries,
docker = docker_final
}

scatter (i in range(length(input_vcfs))) {
call CreateImportTsvs {
input:
input_vcf = input_vcfs[i],
interval_list = interval_list,
input_metrics = input_metric,
sample_map = sample_map,
drop_state = drop_state,
drop_state_includes_greater_than = drop_state_includes_greater_than,
Expand All @@ -51,46 +87,53 @@ workflow ImportGenomes {
}
}

call LoadBigQueryData.LoadBigQueryData as LoadMetadataTsvs {
input:
done = CreateImportTsvs.done,
project_id = project_id,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "metadata",
numbered = "false",
partitioned = "false",
max_table_id = GetMaxTableId.max_table_id,
schema = metadata_schema,
preemptible_tries = preemptible_tries,
docker = docker_final
scatter (i in range(GetMaxTableId.max_table_id)) {
call LoadTable as LoadMetadataTable {
input:
project_id = project_id,
table_id = i + 1,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "metadata",
superpartitioned = "false",
schema = metadata_schema,
table_creation_done = CreateMetadataTables.done,
tsv_creation_done = CreateImportTsvs.done,
docker = docker_final
}
}
call LoadBigQueryData.LoadBigQueryData as LoadPetTsvs {
scatter (i in range(GetMaxTableId.max_table_id)) {
call LoadTable as LoadPetTable {
input:
done = CreateImportTsvs.done,
project_id = project_id,
table_id = i + 1,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "pet",
max_table_id = GetMaxTableId.max_table_id,
superpartitioned = "true",
schema = pet_schema,
preemptible_tries = preemptible_tries,
table_creation_done = CreatePetTables.done,
tsv_creation_done = CreateImportTsvs.done,
docker = docker_final
}
}

call LoadBigQueryData.LoadBigQueryData as LoadVetTsvs {
scatter (i in range(GetMaxTableId.max_table_id)) {
call LoadTable as LoadVetTable {
input:
done = CreateImportTsvs.done,
project_id = project_id,
table_id = i + 1,
dataset_name = dataset_name,
storage_location = output_directory,
datatype = "vet",
max_table_id = GetMaxTableId.max_table_id,
superpartitioned = "true",
schema = vet_schema,
preemptible_tries = preemptible_tries,
table_creation_done = CreateVetTables.done,
tsv_creation_done = CreateImportTsvs.done,
docker = docker_final
}
}
}

task GetMaxTableId {
Expand Down Expand Up @@ -122,7 +165,6 @@ task GetMaxTableId {
task CreateImportTsvs {
input {
File input_vcf
File? input_metrics
File interval_list
String output_directory
File sample_map
Expand All @@ -133,8 +175,6 @@ task CreateImportTsvs {
Int? preemptible_tries
File? gatk_override
String docker

String? for_testing_only
}

Int multiplier = if defined(drop_state) then 4 else 10
Expand All @@ -157,14 +197,12 @@ task CreateImportTsvs {
export TMPDIR=/tmp

export GATK_LOCAL_JAR=~{default="/root/gatk.jar" gatk_override}
~{for_testing_only}

gatk --java-options "-Xmx7000m" CreateVariantIngestFiles \
-V ~{input_vcf} \
-L ~{interval_list} \
~{"-IG " + drop_state} \
--ignore-above-gq-threshold ~{drop_state_includes_greater_than} \
~{"-QCF " + input_metrics} \
--mode GENOMES \
-SNM ~{sample_map} \
--ref-version 38
Expand All @@ -185,3 +223,124 @@ task CreateImportTsvs {
}
}

# Creates all the tables necessary for the LoadData operation
task CreateTables {
meta {
volatile: true
}

input {
String project_id
String dataset_name
String datatype
Int max_table_id
File schema
String superpartitioned
String partitioned
String uuid

# runtime
Int? preemptible_tries
String docker
}

command <<<
set -x
set -e

PREFIX=""
if [ -n "~{uuid}" ]; then
PREFIX="~{uuid}_"
fi

for TABLE_ID in $(seq 1 ~{max_table_id}); do
PARTITION_STRING=""
if [ ~{partitioned} == "true" ]; then
let "PARTITION_START=(${TABLE_ID}-1)*4000+1"
let "PARTITION_END=$PARTITION_START+3999"
let "PARTITION_STEP=1"
PARTITION_FIELD="sample_id"
PARTITION_STRING="--range_partitioning=$PARTITION_FIELD,$PARTITION_START,$PARTITION_END,$PARTITION_STEP"
fi

if [ ~{superpartitioned} = "true" ]; then
printf -v PADDED_TABLE_ID "%03d" ${TABLE_ID}
TABLE="~{dataset_name}.${PREFIX}~{datatype}_${PADDED_TABLE_ID}"
else
TABLE="~{dataset_name}.${PREFIX}~{datatype}"
fi

# Check that the table has not been created yet
set +e
bq show --project_id ~{project_id} $TABLE > /dev/null
BQ_SHOW_RC=$?
set -e
if [ $BQ_SHOW_RC -ne 0 ]; then
echo "making table $TABLE"
bq --location=US mk ${PARTITION_STRING} --project_id=~{project_id} $TABLE ~{schema}
fi
done
>>>

output {
String done = "true"
}

runtime {
docker: docker
memory: "3 GB"
disks: "local-disk 10 HDD"
preemptible: select_first([preemptible_tries, 5])
cpu: 1
}
}

task LoadTable {
meta {
volatile: true
}

input {
String project_id
String table_id
String dataset_name
String storage_location
String datatype
String superpartitioned
File schema
String table_creation_done
Array[String] tsv_creation_done

String docker
}

command <<<
set -x
set -e

DIR="~{storage_location}/~{datatype}_tsvs/"

printf -v PADDED_TABLE_ID "%03d" ~{table_id}

# even for non-superpartitioned tables (e.g. metadata), the TSVs do have the suffix
FILES="~{datatype}_${PADDED_TABLE_ID}_*"

if [ ~{superpartitioned} = "true" ]; then
TABLE="~{dataset_name}.${PREFIX}~{datatype}_${PADDED_TABLE_ID}"
else
TABLE="~{dataset_name}.${PREFIX}~{datatype}"
fi

bq load --location=US --project_id=~{project_id} --skip_leading_rows=1 --source_format=CSV -F "\t" $TABLE $DIR$FILES ~{schema} || exit 1
echo "ingested ${FILES} file from $DIR into table $TABLE"
gsutil mv $DIR$FILES ${DIR}done/
>>>

runtime {
docker: docker
memory: "3 GB"
disks: "local-disk 10 HDD"
preemptible: 0
cpu: 1
}
}
Loading

0 comments on commit 04a9b44

Please sign in to comment.