Skip to content

Commit

Permalink
Merge pull request #524 from broadinstitute/dp-scaffold
Browse files Browse the repository at this point in the history
scaffold_and_refine_multitaxa -- more thorough assembly outputs
  • Loading branch information
dpark01 authored Mar 11, 2024
2 parents 1ce64ab + 7898134 commit 935297e
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 36 deletions.
7 changes: 5 additions & 2 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/terra_table_to_tsv.wdl
testParameterFiles:
- /empty.json
- name: terra_tsv_to_table
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/terra_tsv_to_table.wdl
testParameterFiles:
- /empty.json
- name: terra_update_assemblies
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/terra_update_assemblies.wdl
Expand All @@ -394,13 +399,11 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/bam_to_qiime.wdl
testParameterFiles:
- /empty.json

- name: create_enterics_qc_viz
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl
testParameterFiles:
- /empty.json

- name: create_enterics_qc_viz_general
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl
Expand Down
27 changes: 14 additions & 13 deletions pipes/WDL/tasks/tasks_assembly.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -578,13 +578,14 @@ task refine_assembly_with_aligned_reads {
input {
File reference_fasta
File reads_aligned_bam
String sample_name
String out_basename = basename(reads_aligned_bam, '.bam')
String sample_name = out_basename
Boolean mark_duplicates = false
Float major_cutoff = 0.5
Int min_coverage = 3
Int? machine_mem_gb
Int machine_mem_gb = 15
String docker = "quay.io/broadinstitute/viral-assemble:2.2.4.0"
}
Expand Down Expand Up @@ -614,7 +615,7 @@ task refine_assembly_with_aligned_reads {
}
}
command {
command <<<
set -ex -o pipefail
# find 90% memory
Expand All @@ -639,36 +640,36 @@ task refine_assembly_with_aligned_reads {
temp_markdup.bam \
refined.fasta \
--already_realigned_bam=temp_markdup.bam \
--outVcf ~{sample_name}.sites.vcf.gz \
--outVcf "~{out_basename}.sites.vcf.gz" \
--min_coverage ~{min_coverage} \
--major_cutoff ~{major_cutoff} \
--JVMmemory "$mem_in_mb"m \
--loglevel=DEBUG
file_utils.py rename_fasta_sequences \
refined.fasta "${sample_name}.fasta" "${sample_name}"
refined.fasta "~{out_basename}.fasta" "~{sample_name}"
# collect variant counts
if (( $(cat refined.fasta | wc -l) > 1 )); then
bcftools filter -e "FMT/DP<${min_coverage}" -S . "${sample_name}.sites.vcf.gz" -Ou | bcftools filter -i "AC>1" -Ou > "${sample_name}.diffs.vcf"
bcftools filter -i 'TYPE="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_snps
bcftools filter -i 'TYPE!="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_indels
bcftools filter -e "FMT/DP<~{min_coverage}" -S . "~{out_basename}.sites.vcf.gz" -Ou | bcftools filter -i "AC>1" -Ou > "~{out_basename}.diffs.vcf"
bcftools filter -i 'TYPE="snp"' "~{out_basename}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_snps
bcftools filter -i 'TYPE!="snp"' "~{out_basename}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_indels
else
# empty output
echo "0" > num_snps
echo "0" > num_indels
cp "${sample_name}.sites.vcf.gz" "${sample_name}.diffs.vcf"
cp "~{out_basename}.sites.vcf.gz" "~{out_basename}.diffs.vcf"
fi
# collect figures of merit
set +o pipefail # grep will exit 1 if it fails to find the pattern
grep -v '^>' refined.fasta | tr -d '\n' | wc -c | tee assembly_length
grep -v '^>' refined.fasta | tr -d '\nNn' | wc -c | tee assembly_length_unambiguous
}
>>>
output {
File refined_assembly_fasta = "${sample_name}.fasta"
File sites_vcf_gz = "${sample_name}.sites.vcf.gz"
File refined_assembly_fasta = "~{out_basename}.fasta"
File sites_vcf_gz = "~{out_basename}.sites.vcf.gz"
Int assembly_length = read_int("assembly_length")
Int assembly_length_unambiguous = read_int("assembly_length_unambiguous")
Int dist_to_ref_snps = read_int("num_snps")
Expand All @@ -678,7 +679,7 @@ task refine_assembly_with_aligned_reads {
runtime {
docker: docker
memory: select_first([machine_mem_gb, 15]) + " GB"
memory: machine_mem_gb + " GB"
cpu: 8
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
Expand Down
6 changes: 4 additions & 2 deletions pipes/WDL/tasks/tasks_nextstrain.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,8 @@ task augur_mafft_align {
String docker = "nextstrain/base:build-20230905T192825Z"
Int disk_size = 750
Int mem_size = 180
Int cpus = 64
}
command <<<
set -e
Expand All @@ -1165,8 +1167,8 @@ task augur_mafft_align {
>>>
runtime {
docker: docker
memory: "180 GB"
cpu : 64
memory: mem_size + " GB"
cpu : cpus
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
preemptible: 0
Expand Down
4 changes: 3 additions & 1 deletion pipes/WDL/workflows/assemble_refbased.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ workflow assemble_refbased {
Array[File]+ reads_unmapped_bams
File reference_fasta
String sample_name = basename(reads_unmapped_bams[0], '.bam')
String? sample_original_name

String aligner="minimap2"
File? novocraft_license
Expand Down Expand Up @@ -150,7 +151,8 @@ workflow assemble_refbased {
reads_aligned_bam = aligned_trimmed_bam,
min_coverage = min_coverage,
major_cutoff = major_cutoff,
sample_name = sample_name
out_basename = sample_name,
sample_name = select_first([sample_original_name, sample_name])
}

scatter(reads_unmapped_bam in reads_unmapped_bams) {
Expand Down
62 changes: 44 additions & 18 deletions pipes/WDL/workflows/scaffold_and_refine_multitaxa.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ version 1.0
import "../tasks/tasks_assembly.wdl" as assembly
import "../tasks/tasks_metagenomics.wdl" as metagenomics
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_reports.wdl" as reports
import "../tasks/tasks_utils.wdl" as utils
import "assemble_refbased.wdl" as assemble_refbased
Expand All @@ -16,6 +17,7 @@ workflow scaffold_and_refine_multitaxa {

input {
String sample_id
Array[String] sample_names
File reads_unmapped_bam

File taxid_to_ref_accessions_tsv
Expand All @@ -25,7 +27,8 @@ workflow scaffold_and_refine_multitaxa {
# Float min_pct_reference_covered = 0.1
}

Int min_scaffold_unambig = 10
Int min_scaffold_unambig = 10
String sample_original_name = flatten([sample_names, [sample_id]])[0]

# if kraken reports are available, filter scaffold list to observed hits (output might be empty!)
if(defined(focal_report_tsv) && defined(ncbi_taxdump_tgz)) {
Expand All @@ -37,7 +40,7 @@ workflow scaffold_and_refine_multitaxa {
}
}

Array[String] assembly_header = ["sample_id", "taxid", "tax_name", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned"]
Array[String] assembly_header = ["entity:assembly_id", "assembly_name", "sample_id", "sample_name", "taxid", "tax_name", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "scaffolding_num_segments_recovered", "reference_num_segments_required", "reference_length", "reference_accessions", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned", "coverage_genbank", "assembly_method", "sample"]
Array[Array[String]] taxid_to_ref_accessions = read_tsv(select_first([filter_refs_to_found_taxa.filtered_taxid_to_ref_accessions_tsv, taxid_to_ref_accessions_tsv]))
scatter(taxon in taxid_to_ref_accessions) {
# cromwell's read_tsv emits [[""]] on empty (0-byte) file input, turn it into []
Expand Down Expand Up @@ -70,35 +73,51 @@ workflow scaffold_and_refine_multitaxa {
# polish de novo assembly with reads
call assemble_refbased.assemble_refbased as refine {
input:
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = scaffold.scaffold_fasta,
sample_name = sample_id
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = scaffold.scaffold_fasta,
sample_name = sample_id,
sample_original_name = sample_original_name
}
String assembly_method_denovo = "viral-ngs/assemble_denovo"
}
if (scaffold.assembly_preimpute_length_unambiguous <= min_scaffold_unambig) {
# fall back to refbased assembly if de novo fails
call assemble_refbased.assemble_refbased as ref_based {
input:
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = download_annotations.combined_fasta,
sample_name = sample_id
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = download_annotations.combined_fasta,
sample_name = sample_id,
sample_original_name = sample_original_name
}
String assembly_method_refbased = "viral-ngs/assemble_refbased"
}

# TO DO: if percent_reference_covered > some threshold, run ncbi.rename_fasta_header and ncbi.align_and_annot_transfer_single
# TO DO: if biosample attributes file provided, run ncbi.biosample_to_genbank
String taxid = taxon[0]
String tax_name = taxon[1]
Int assembly_length_unambiguous = select_first([refine.assembly_length_unambiguous, ref_based.assembly_length_unambiguous])
Float percent_reference_covered = 1.0 * assembly_length_unambiguous / scaffold.reference_length
File assembly_fasta = select_first([refine.assembly_fasta, ref_based.assembly_fasta])

if(assembly_length_unambiguous > 0) {
call reports.coverage_report as coverage_self {
input:
mapped_bams = select_all([refine.align_to_self_merged_aligned_only_bam, ref_based.align_to_self_merged_aligned_only_bam]),
mapped_bam_idx = []
}
call utils.tsv_drop_cols as coverage_two_col {
input:
in_tsv = coverage_self.coverage_report,
drop_cols = ["aln2self_cov_median", "aln2self_cov_mean_non0", "aln2self_cov_1X", "aln2self_cov_5X", "aln2self_cov_20X", "aln2self_cov_100X"]
}
}

String taxid = taxon[0]
String tax_name = taxon[1]
Map[String, String] stats_by_taxon = {
"sample_id" : sample_id,
"taxid" : taxid,
"tax_name" : tax_name,
"entity:assembly_id" : sample_id + "-" + taxid,
"assembly_name" : tax_name + ": " + sample_original_name,
"sample_id" : sample_id,
"sample_name" : sample_original_name,
"taxid" : taxid,
"tax_name" : tax_name,

"assembly_fasta" : assembly_fasta,
"aligned_only_reads_bam" : select_first([refine.align_to_self_merged_aligned_only_bam, ref_based.align_to_self_merged_aligned_only_bam]),
Expand All @@ -108,6 +127,10 @@ workflow scaffold_and_refine_multitaxa {
"reads_aligned" : select_first([refine.align_to_self_merged_reads_aligned, ref_based.align_to_self_merged_reads_aligned]),
"mean_coverage" : select_first([refine.align_to_self_merged_mean_coverage, ref_based.align_to_self_merged_mean_coverage]),
"percent_reference_covered" : percent_reference_covered,
"scaffolding_num_segments_recovered" : scaffold.assembly_num_segments_recovered,
"reference_num_segments_required" : scaffold.reference_num_segments_required,
"reference_length" : scaffold.reference_length,
"reference_accessions" : taxon[2],

"intermediate_gapfill_fasta" : scaffold.intermediate_gapfill_fasta,
"assembly_preimpute_length_unambiguous" : scaffold.assembly_preimpute_length_unambiguous,
Expand All @@ -122,15 +145,18 @@ workflow scaffold_and_refine_multitaxa {
"coverage_tsv" : select_first([refine.align_to_self_merged_coverage_tsv, ref_based.align_to_self_merged_coverage_tsv]),
"read_pairs_aligned" : select_first([refine.align_to_self_merged_read_pairs_aligned, ref_based.align_to_self_merged_read_pairs_aligned]),
"bases_aligned" : select_first([refine.align_to_self_merged_bases_aligned, ref_based.align_to_self_merged_bases_aligned]),
"coverage_genbank" : select_first([coverage_two_col.out_tsv, ""]),

"assembly_method" : select_first([assembly_method_denovo, assembly_method_refbased]),

"assembly_method" : select_first([assembly_method_denovo, assembly_method_refbased])
"sample": '{"entityType":"sample","entityName":"' + sample_id + '"}'
}

scatter(h in assembly_header) {
String stat_by_taxon = stats_by_taxon[h]
}
}

### summary stats
call utils.concatenate {
input:
Expand Down
21 changes: 21 additions & 0 deletions pipes/WDL/workflows/terra_tsv_to_table.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version 1.0

#DX_SKIP_WORKFLOW
import "../tasks/tasks_terra.wdl" as terra

workflow terra_tsv_to_table {
meta {
description: "Upload tsv file to Terra data table: insert-or-update on existing rows/columns"
author: "Broad Viral Genomics"
email: "[email protected]"
}

call terra.check_terra_env

call terra.upload_entities_tsv {
input:
workspace_name = check_terra_env.workspace_name,
terra_project = check_terra_env.workspace_namespace
}
}

0 comments on commit 935297e

Please sign in to comment.