Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scaffold_and_refine_multitaxa -- more thorough assembly outputs #524

Merged
merged 21 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/terra_table_to_tsv.wdl
testParameterFiles:
- /empty.json
- name: terra_tsv_to_table
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/terra_tsv_to_table.wdl
testParameterFiles:
- /empty.json
- name: terra_update_assemblies
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/terra_update_assemblies.wdl
Expand All @@ -394,13 +399,11 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/bam_to_qiime.wdl
testParameterFiles:
- /empty.json

- name: create_enterics_qc_viz
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl
testParameterFiles:
- /empty.json

- name: create_enterics_qc_viz_general
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl
Expand Down
27 changes: 14 additions & 13 deletions pipes/WDL/tasks/tasks_assembly.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -578,13 +578,14 @@ task refine_assembly_with_aligned_reads {
input {
File reference_fasta
File reads_aligned_bam
String sample_name
String out_basename = basename(reads_aligned_bam, '.bam')
String sample_name = out_basename

Boolean mark_duplicates = false
Float major_cutoff = 0.5
Int min_coverage = 3

Int? machine_mem_gb
Int machine_mem_gb = 15
String docker = "quay.io/broadinstitute/viral-assemble:2.2.4.0"
}

Expand Down Expand Up @@ -614,7 +615,7 @@ task refine_assembly_with_aligned_reads {
}
}

command {
command <<<
set -ex -o pipefail

# find 90% memory
Expand All @@ -639,36 +640,36 @@ task refine_assembly_with_aligned_reads {
temp_markdup.bam \
refined.fasta \
--already_realigned_bam=temp_markdup.bam \
--outVcf ~{sample_name}.sites.vcf.gz \
--outVcf "~{out_basename}.sites.vcf.gz" \
--min_coverage ~{min_coverage} \
--major_cutoff ~{major_cutoff} \
--JVMmemory "$mem_in_mb"m \
--loglevel=DEBUG

file_utils.py rename_fasta_sequences \
refined.fasta "${sample_name}.fasta" "${sample_name}"
refined.fasta "~{out_basename}.fasta" "~{sample_name}"

# collect variant counts
if (( $(cat refined.fasta | wc -l) > 1 )); then
bcftools filter -e "FMT/DP<${min_coverage}" -S . "${sample_name}.sites.vcf.gz" -Ou | bcftools filter -i "AC>1" -Ou > "${sample_name}.diffs.vcf"
bcftools filter -i 'TYPE="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_snps
bcftools filter -i 'TYPE!="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_indels
bcftools filter -e "FMT/DP<~{min_coverage}" -S . "~{out_basename}.sites.vcf.gz" -Ou | bcftools filter -i "AC>1" -Ou > "~{out_basename}.diffs.vcf"
bcftools filter -i 'TYPE="snp"' "~{out_basename}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_snps
bcftools filter -i 'TYPE!="snp"' "~{out_basename}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_indels
else
# empty output
echo "0" > num_snps
echo "0" > num_indels
cp "${sample_name}.sites.vcf.gz" "${sample_name}.diffs.vcf"
cp "~{out_basename}.sites.vcf.gz" "~{out_basename}.diffs.vcf"
fi

# collect figures of merit
set +o pipefail # grep will exit 1 if it fails to find the pattern
grep -v '^>' refined.fasta | tr -d '\n' | wc -c | tee assembly_length
grep -v '^>' refined.fasta | tr -d '\nNn' | wc -c | tee assembly_length_unambiguous
}
>>>

output {
File refined_assembly_fasta = "${sample_name}.fasta"
File sites_vcf_gz = "${sample_name}.sites.vcf.gz"
File refined_assembly_fasta = "~{out_basename}.fasta"
File sites_vcf_gz = "~{out_basename}.sites.vcf.gz"
Int assembly_length = read_int("assembly_length")
Int assembly_length_unambiguous = read_int("assembly_length_unambiguous")
Int dist_to_ref_snps = read_int("num_snps")
Expand All @@ -678,7 +679,7 @@ task refine_assembly_with_aligned_reads {

runtime {
docker: docker
memory: select_first([machine_mem_gb, 15]) + " GB"
memory: machine_mem_gb + " GB"
cpu: 8
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
Expand Down
6 changes: 4 additions & 2 deletions pipes/WDL/tasks/tasks_nextstrain.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,8 @@ task augur_mafft_align {

String docker = "nextstrain/base:build-20230905T192825Z"
Int disk_size = 750
Int mem_size = 180
Int cpus = 64
}
command <<<
set -e
Expand All @@ -1165,8 +1167,8 @@ task augur_mafft_align {
>>>
runtime {
docker: docker
memory: "180 GB"
cpu : 64
memory: mem_size + " GB"
cpu : cpus
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
preemptible: 0
Expand Down
4 changes: 3 additions & 1 deletion pipes/WDL/workflows/assemble_refbased.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ workflow assemble_refbased {
Array[File]+ reads_unmapped_bams
File reference_fasta
String sample_name = basename(reads_unmapped_bams[0], '.bam')
String? sample_original_name

String aligner="minimap2"
File? novocraft_license
Expand Down Expand Up @@ -150,7 +151,8 @@ workflow assemble_refbased {
reads_aligned_bam = aligned_trimmed_bam,
min_coverage = min_coverage,
major_cutoff = major_cutoff,
sample_name = sample_name
out_basename = sample_name,
sample_name = select_first([sample_original_name, sample_name])
}

scatter(reads_unmapped_bam in reads_unmapped_bams) {
Expand Down
62 changes: 44 additions & 18 deletions pipes/WDL/workflows/scaffold_and_refine_multitaxa.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ version 1.0
import "../tasks/tasks_assembly.wdl" as assembly
import "../tasks/tasks_metagenomics.wdl" as metagenomics
import "../tasks/tasks_ncbi.wdl" as ncbi
import "../tasks/tasks_reports.wdl" as reports
import "../tasks/tasks_utils.wdl" as utils
import "assemble_refbased.wdl" as assemble_refbased

Expand All @@ -16,6 +17,7 @@ workflow scaffold_and_refine_multitaxa {

input {
String sample_id
Array[String] sample_names
File reads_unmapped_bam

File taxid_to_ref_accessions_tsv
Expand All @@ -25,7 +27,8 @@ workflow scaffold_and_refine_multitaxa {
# Float min_pct_reference_covered = 0.1
}

Int min_scaffold_unambig = 10
Int min_scaffold_unambig = 10
String sample_original_name = flatten([sample_names, [sample_id]])[0]

# if kraken reports are available, filter scaffold list to observed hits (output might be empty!)
if(defined(focal_report_tsv) && defined(ncbi_taxdump_tgz)) {
Expand All @@ -37,7 +40,7 @@ workflow scaffold_and_refine_multitaxa {
}
}

Array[String] assembly_header = ["sample_id", "taxid", "tax_name", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned"]
Array[String] assembly_header = ["entity:assembly_id", "assembly_name", "sample_id", "sample_name", "taxid", "tax_name", "assembly_fasta", "aligned_only_reads_bam", "coverage_plot", "assembly_length", "assembly_length_unambiguous", "reads_aligned", "mean_coverage", "percent_reference_covered", "scaffolding_num_segments_recovered", "reference_num_segments_required", "reference_length", "reference_accessions", "intermediate_gapfill_fasta", "assembly_preimpute_length_unambiguous", "replicate_concordant_sites", "replicate_discordant_snps", "replicate_discordant_indels", "replicate_discordant_vcf", "isnvsFile", "aligned_bam", "coverage_tsv", "read_pairs_aligned", "bases_aligned", "coverage_genbank", "assembly_method", "sample"]
Array[Array[String]] taxid_to_ref_accessions = read_tsv(select_first([filter_refs_to_found_taxa.filtered_taxid_to_ref_accessions_tsv, taxid_to_ref_accessions_tsv]))
scatter(taxon in taxid_to_ref_accessions) {
# cromwell's read_tsv emits [[""]] on empty (0-byte) file input, turn it into []
Expand Down Expand Up @@ -70,35 +73,51 @@ workflow scaffold_and_refine_multitaxa {
# polish de novo assembly with reads
call assemble_refbased.assemble_refbased as refine {
input:
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = scaffold.scaffold_fasta,
sample_name = sample_id
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = scaffold.scaffold_fasta,
sample_name = sample_id,
sample_original_name = sample_original_name
}
String assembly_method_denovo = "viral-ngs/assemble_denovo"
}
if (scaffold.assembly_preimpute_length_unambiguous <= min_scaffold_unambig) {
# fall back to refbased assembly if de novo fails
call assemble_refbased.assemble_refbased as ref_based {
input:
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = download_annotations.combined_fasta,
sample_name = sample_id
reads_unmapped_bams = [reads_unmapped_bam],
reference_fasta = download_annotations.combined_fasta,
sample_name = sample_id,
sample_original_name = sample_original_name
}
String assembly_method_refbased = "viral-ngs/assemble_refbased"
}

# TO DO: if percent_reference_covered > some threshold, run ncbi.rename_fasta_header and ncbi.align_and_annot_transfer_single
# TO DO: if biosample attributes file provided, run ncbi.biosample_to_genbank

String taxid = taxon[0]
String tax_name = taxon[1]
Int assembly_length_unambiguous = select_first([refine.assembly_length_unambiguous, ref_based.assembly_length_unambiguous])
Float percent_reference_covered = 1.0 * assembly_length_unambiguous / scaffold.reference_length
File assembly_fasta = select_first([refine.assembly_fasta, ref_based.assembly_fasta])

if(assembly_length_unambiguous > 0) {
call reports.coverage_report as coverage_self {
input:
mapped_bams = select_all([refine.align_to_self_merged_aligned_only_bam, ref_based.align_to_self_merged_aligned_only_bam]),
mapped_bam_idx = []
}
call utils.tsv_drop_cols as coverage_two_col {
input:
in_tsv = coverage_self.coverage_report,
drop_cols = ["aln2self_cov_median", "aln2self_cov_mean_non0", "aln2self_cov_1X", "aln2self_cov_5X", "aln2self_cov_20X", "aln2self_cov_100X"]
}
}

String taxid = taxon[0]
String tax_name = taxon[1]
Map[String, String] stats_by_taxon = {
"sample_id" : sample_id,
"taxid" : taxid,
"tax_name" : tax_name,
"entity:assembly_id" : sample_id + "-" + taxid,
"assembly_name" : tax_name + ": " + sample_original_name,
"sample_id" : sample_id,
"sample_name" : sample_original_name,
"taxid" : taxid,
"tax_name" : tax_name,

"assembly_fasta" : assembly_fasta,
"aligned_only_reads_bam" : select_first([refine.align_to_self_merged_aligned_only_bam, ref_based.align_to_self_merged_aligned_only_bam]),
Expand All @@ -108,6 +127,10 @@ workflow scaffold_and_refine_multitaxa {
"reads_aligned" : select_first([refine.align_to_self_merged_reads_aligned, ref_based.align_to_self_merged_reads_aligned]),
"mean_coverage" : select_first([refine.align_to_self_merged_mean_coverage, ref_based.align_to_self_merged_mean_coverage]),
"percent_reference_covered" : percent_reference_covered,
"scaffolding_num_segments_recovered" : scaffold.assembly_num_segments_recovered,
"reference_num_segments_required" : scaffold.reference_num_segments_required,
"reference_length" : scaffold.reference_length,
"reference_accessions" : taxon[2],

"intermediate_gapfill_fasta" : scaffold.intermediate_gapfill_fasta,
"assembly_preimpute_length_unambiguous" : scaffold.assembly_preimpute_length_unambiguous,
Expand All @@ -122,15 +145,18 @@ workflow scaffold_and_refine_multitaxa {
"coverage_tsv" : select_first([refine.align_to_self_merged_coverage_tsv, ref_based.align_to_self_merged_coverage_tsv]),
"read_pairs_aligned" : select_first([refine.align_to_self_merged_read_pairs_aligned, ref_based.align_to_self_merged_read_pairs_aligned]),
"bases_aligned" : select_first([refine.align_to_self_merged_bases_aligned, ref_based.align_to_self_merged_bases_aligned]),
"coverage_genbank" : select_first([coverage_two_col.out_tsv, ""]),

"assembly_method" : select_first([assembly_method_denovo, assembly_method_refbased]),

"assembly_method" : select_first([assembly_method_denovo, assembly_method_refbased])
"sample": '{"entityType":"sample","entityName":"' + sample_id + '"}'
}

scatter(h in assembly_header) {
String stat_by_taxon = stats_by_taxon[h]
}
}

### summary stats
call utils.concatenate {
input:
Expand Down
21 changes: 21 additions & 0 deletions pipes/WDL/workflows/terra_tsv_to_table.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
version 1.0

#DX_SKIP_WORKFLOW

import "../tasks/tasks_terra.wdl" as terra

workflow terra_tsv_to_table {
meta {
description: "Upload tsv file to Terra data table: insert-or-update on existing rows/columns"
author: "Broad Viral Genomics"
email: "[email protected]"
}

call terra.check_terra_env

call terra.upload_entities_tsv {
input:
workspace_name = check_terra_env.workspace_name,
terra_project = check_terra_env.workspace_namespace
}
}
Loading