Skip to content

Commit

Permalink
Merge pull request #196 from broadinstitute/dp-genbank
Browse files Browse the repository at this point in the history
improvements to sarscov2_illumina_full
  • Loading branch information
dpark01 authored Jan 16, 2021
2 parents ddbe7dc + fedc2f1 commit 799e054
Show file tree
Hide file tree
Showing 14 changed files with 613 additions and 120 deletions.
5 changes: 5 additions & 0 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/sarscov2_genbank.wdl
testParameterFiles:
- empty.json
- name: sarscov2_illumina_full
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/sarscov2_illumina_full.wdl
testParameterFiles:
- empty.json
- name: sarscov2_lineages
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/sarscov2_lineages.wdl
Expand Down
27 changes: 17 additions & 10 deletions pipes/WDL/tasks/tasks_assembly.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ task assemble {
String sample_name = basename(basename(reads_unmapped_bam, ".bam"), ".taxfilt")

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.0"
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.1"
}

command {
Expand Down Expand Up @@ -80,7 +80,7 @@ task scaffold {
Float? scaffold_min_pct_contig_aligned

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.0"
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.1"

# do this in multiple steps in case the input doesn't actually have "assembly1-x" in the name
String sample_name = basename(basename(contigs_fasta, ".fasta"), ".assembly1-spades")
Expand Down Expand Up @@ -226,7 +226,7 @@ task align_reads {
Boolean? skip_mark_dupes=false

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-core:2.1.16"
String docker="quay.io/broadinstitute/viral-core:2.1.18"

String sample_name = basename(basename(basename(reads_unmapped_bam, ".bam"), ".taxfilt"), ".clean")
}
Expand Down Expand Up @@ -342,7 +342,7 @@ task refine_assembly_with_aligned_reads {
Int? min_coverage=3

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.0"
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.1"
}

parameter_meta {
Expand Down Expand Up @@ -389,9 +389,16 @@ task refine_assembly_with_aligned_reads {
refined.fasta "${sample_name}.fasta" "${sample_name}"

# collect variant counts
bcftools filter -e "FMT/DP<${min_coverage}" -S . "${sample_name}.sites.vcf.gz" -Ou | bcftools filter -i "AC>1" -Ou > "${sample_name}.diffs.vcf"
bcftools filter -i 'TYPE="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_snps
bcftools filter -i 'TYPE!="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_indels
if (( $(cat refined.fasta | wc -l) > 1 )); then
bcftools filter -e "FMT/DP<${min_coverage}" -S . "${sample_name}.sites.vcf.gz" -Ou | bcftools filter -i "AC>1" -Ou > "${sample_name}.diffs.vcf"
bcftools filter -i 'TYPE="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_snps
bcftools filter -i 'TYPE!="snp"' "${sample_name}.diffs.vcf" | bcftools query -f '%POS\n' | wc -l | tee num_indels
else
# empty output
echo "0" > num_snps
echo "0" > num_indels
cp "${sample_name}.sites.vcf.gz" "${sample_name}.diffs.vcf"
fi

# collect figures of merit
set +o pipefail # grep will exit 1 if it fails to find the pattern
Expand Down Expand Up @@ -434,7 +441,7 @@ task refine {
Int? min_coverage=1

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.0"
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.1"

String assembly_basename=basename(basename(assembly_fasta, ".fasta"), ".scaffold")
}
Expand Down Expand Up @@ -504,7 +511,7 @@ task refine_2x_and_plot {
String? plot_coverage_novoalign_options="-r Random -l 40 -g 40 -x 20 -t 100 -k"

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.0"
String docker="quay.io/broadinstitute/viral-assemble:2.1.16.1"

# do this in two steps in case the input doesn't actually have "cleaned" in the name
String sample_name = basename(basename(reads_unmapped_bam, ".bam"), ".cleaned")
Expand Down Expand Up @@ -636,7 +643,7 @@ task run_discordance {
String out_basename = "run"
Int min_coverage=4

String docker="quay.io/broadinstitute/viral-core:2.1.16"
String docker="quay.io/broadinstitute/viral-core:2.1.18"
}

command {
Expand Down
168 changes: 142 additions & 26 deletions pipes/WDL/tasks/tasks_demux.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ task merge_tarballs {
String out_filename

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-core:2.1.16"
String docker="quay.io/broadinstitute/viral-core:2.1.18"
}

command {
Expand All @@ -19,17 +19,17 @@ task merge_tarballs {
file_utils.py --version | tee VERSION

file_utils.py merge_tarballs \
${out_filename} ${sep=' ' tar_chunks} \
~{out_filename} ~{sep=' ' tar_chunks} \
--loglevel=DEBUG
}

output {
File combined_tar = "${out_filename}"
File combined_tar = "~{out_filename}"
String viralngs_version = read_string("VERSION")
}

runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 7]) + " GB"
cpu: 16
disks: "local-disk 2625 LOCAL"
Expand All @@ -38,6 +38,47 @@ task merge_tarballs {
}
}

task samplesheet_rename_ids {
input {
File old_sheet
File? rename_map
String old_id_col = 'internal_id'
String new_id_col = 'external_id'
}
String new_base = basename(old_sheet, '.txt')
command <<<
python3 << CODE
import csv
# read in the rename_map file
old_to_new = {}
with open('~{default="/dev/null" rename_map}', 'rt') as inf:
for row in csv.DictReader(inf, delimiter='\t'):
old_to_new[row['~{old_id_col}']] = row['~{new_id_col}']
# change all ids in the sample column to new ids
with open('~{old_sheet}', 'rt') as inf:
reader = csv.DictReader(inf, delimiter='\t')
with open('~{new_base}.renamed.txt', 'w', newline='') as outf:
writer = csv.DictWriter(outf, reader.fieldnames, delimiter='\t', dialect=csv.unix_dialect, quoting=csv.QUOTE_MINIMAL)
writer.writeheader()
for row in reader:
row['sample'] = old_to_new.get(row['sample'], row['sample'])
writer.writerow(row)
CODE
>>>
output {
File new_sheet = '~{new_base}.renamed.txt'
}
runtime {
docker: "python:slim"
memory: "1 GB"
cpu: 1
disks: "local-disk 50 HDD"
dx_instance_type: "mem1_ssd1_v2_x2"
}
}
task illumina_demux {
input {
File flowcell_tgz
Expand All @@ -60,7 +101,13 @@ task illumina_demux {
Boolean? forceGC=true
Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-core:2.1.16"
String docker="quay.io/broadinstitute/viral-core:2.1.18"
}
parameter_meta {
flowcell_tgz: {
description: "Illumina BCL directory compressed as tarball. Must contain RunInfo.xml (unless overridden by runinfo), SampleSheet.csv (unless overridden by samplesheet), RTAComplete.txt, and Data/Intensities/BaseCalls/*",
patterns: ["*.tar.gz", ".tar.zst", ".tar.bz2", ".tar.lz4", ".tgz"]
}
}
command {
Expand All @@ -77,12 +124,12 @@ task illumina_demux {
read_utils.py --version | tee VERSION
read_utils.py extract_tarball \
${flowcell_tgz} $FLOWCELL_DIR \
~{flowcell_tgz} $FLOWCELL_DIR \
--loglevel=DEBUG
# if we are overriding the RunInfo file, use the path of the file provided. Otherwise find the file
if [ -n "${runinfo}" ]; then
RUNINFO_FILE="${runinfo}"
if [ -n "~{runinfo}" ]; then
RUNINFO_FILE="~{runinfo}"
else
# full RunInfo.xml path
RUNINFO_FILE="$(find $FLOWCELL_DIR -type f -name RunInfo.xml | head -n 1)"
Expand Down Expand Up @@ -157,45 +204,48 @@ task illumina_demux {
# use the passed-in (or default) WDL value first, then fall back to the auto-scaled value
# if the result of this is null (nothing is passed in, no autoscaled value, no param is passed to the command)
if [ -n "${minimumBaseQuality}" ]; then demux_min_base_quality="${minimumBaseQuality}"; else demux_min_base_quality="$demux_min_base_quality"; fi
if [ -n "~{minimumBaseQuality}" ]; then demux_min_base_quality="~{minimumBaseQuality}"; else demux_min_base_quality="$demux_min_base_quality"; fi
if [ -n "$demux_min_base_quality" ]; then demux_min_base_quality="--minimum_base_quality=$demux_min_base_quality";fi
if [ -n "${threads}" ]; then demux_threads="${threads}"; else demux_threads="$demux_threads"; fi
if [ -n "~{threads}" ]; then demux_threads="~{threads}"; else demux_threads="$demux_threads"; fi
if [ -n "$demux_threads" ]; then demux_threads="--threads=$demux_threads"; fi
if [ -n "${maxReadsInRamPerTile}" ]; then max_reads_in_ram_per_tile="${maxReadsInRamPerTile}"; else max_reads_in_ram_per_tile="$max_reads_in_ram_per_tile"; fi
if [ -n "~{maxReadsInRamPerTile}" ]; then max_reads_in_ram_per_tile="~{maxReadsInRamPerTile}"; else max_reads_in_ram_per_tile="$max_reads_in_ram_per_tile"; fi
if [ -n "$max_reads_in_ram_per_tile" ]; then max_reads_in_ram_per_tile="--max_reads_in_ram_per_tile=$max_reads_in_ram_per_tile"; fi
if [ -n "${maxRecordsInRam}" ]; then max_records_in_ram="${maxRecordsInRam}"; else max_records_in_ram="$max_records_in_ram"; fi
if [ -n "~{maxRecordsInRam}" ]; then max_records_in_ram="~{maxRecordsInRam}"; else max_records_in_ram="$max_records_in_ram"; fi
if [ -n "$max_records_in_ram" ]; then max_records_in_ram="--max_records_in_ram=$max_records_in_ram"; fi
# note that we are intentionally setting --threads to about 2x the core
# count. seems to still provide speed benefit (over 1x) when doing so.
illumina.py illumina_demux \
$FLOWCELL_DIR \
${lane} \
~{lane} \
. \
${'--sampleSheet=' + samplesheet} \
${'--runInfo=' + runinfo} \
${'--sequencing_center=' + sequencingCenter} \
~{'--sampleSheet=' + samplesheet} \
~{'--runInfo=' + runinfo} \
~{'--sequencing_center=' + sequencingCenter} \
--outMetrics=metrics.txt \
--commonBarcodes=barcodes.txt \
${'--flowcell=' + flowcell} \
~{'--flowcell=' + flowcell} \
$demux_min_base_quality \
${'--max_mismatches=' + maxMismatches} \
${'--min_mismatch_delta=' + minMismatchDelta} \
${'--max_no_calls=' + maxNoCalls} \
${'--read_structure=' + readStructure} \
${'--minimum_quality=' + minimumQuality} \
${'--run_start_date=' + runStartDate} \
~{'--max_mismatches=' + maxMismatches} \
~{'--min_mismatch_delta=' + minMismatchDelta} \
~{'--max_no_calls=' + maxNoCalls} \
~{'--read_structure=' + readStructure} \
~{'--minimum_quality=' + minimumQuality} \
~{'--run_start_date=' + runStartDate} \
$max_reads_in_ram_per_tile \
$max_records_in_ram \
--JVMmemory="$mem_in_mb"m \
$demux_threads \
${true='--force_gc=true' false="--force_gc=false" forceGC} \
~{true='--force_gc=true' false="--force_gc=false" forceGC} \
--append_run_id \
--compression_level=5 \
--out_meta_by_sample meta_by_sample.json \
--out_meta_by_filename meta_by_fname.json \
--out_runinfo runinfo.json \
--loglevel=DEBUG
illumina.py guess_barcodes --expected_assigned_fraction=0 barcodes.txt metrics.txt barcodes_outliers.txt
Expand All @@ -208,14 +258,15 @@ task illumina_demux {
echo "$(basename $bam .bam)" >> $OUT_BASENAMES
done
# fastqc
FASTQC_HARDCODED_MEM_PER_THREAD=250 # the value fastqc sets for -Xmx per thread, not adjustable
num_cpus=$(nproc)
num_bam_files=$(cat $OUT_BASENAMES | wc -l)
num_fastqc_jobs=1
num_fastqc_threads=1
total_ram_needed_mb=250
# determine the number of fastq jobs
# determine the number of fastqc jobs
while [[ $total_ram_needed_mb -lt $mem_in_mb ]] && [[ $num_fastqc_jobs -lt $num_cpus ]] && [[ $num_fastqc_jobs -lt $num_bam_files ]]; do
num_fastqc_jobs=$(($num_fastqc_jobs+1))
total_ram_needed_mb=$(($total_ram_needed_mb+$FASTQC_HARDCODED_MEM_PER_THREAD))
Expand Down Expand Up @@ -257,10 +308,17 @@ task illumina_demux {
Int runtime_sec = ceil(read_float("UPTIME_SEC"))
Int cpu_load_15min = ceil(read_float("LOAD_15M"))
String viralngs_version = read_string("VERSION")
Map[String,Map[String,String]] meta_by_sample = read_json('meta_by_sample.json')
Map[String,Map[String,String]] meta_by_filename = read_json('meta_by_fname.json')
Map[String,String] run_info = read_json('runinfo.json')
File meta_by_sample_json = 'meta_by_sample.json'
File meta_by_filename_json = 'meta_by_fname.json'
File run_info_json = 'runinfo.json'
}
runtime {
docker: "${docker}"
docker: docker
memory: select_first([machine_mem_gb, 200]) + " GB"
cpu: 32
disks: "local-disk 2625 LOCAL"
Expand All @@ -269,3 +327,61 @@ task illumina_demux {
preemptible: 0 # this is the very first operation before scatter, so let's get it done quickly & reliably
}
}
task map_map_setdefault {
input {
File map_map_json
Array[String] sub_keys
}
command <<<
python3 << CODE
import json
sub_keys = '~{sep="*" sub_keys}'.split('*')
with open('~{map_map_json}', 'rt') as inf:
out = json.load(inf)
for k in out.keys():
for sub_key in sub_keys:
out[k].setdefault(sub_key, "")
with open('out.json', 'wt') as outf:
json.dump(out, outf, indent=2)
CODE
>>>
output {
File out_json = 'out.json'
}
runtime {
docker: "python:slim"
memory: "1 GB"
cpu: 1
disks: "local-disk 20 HDD"
dx_instance_type: "mem1_ssd1_v2_x2"
}
}
task merge_maps {
input {
Array[File] maps_jsons
}
command <<<
python3 << CODE
import json
infiles = '~{sep='*' maps_jsons}'.split('*')
out = {}
for fname in infiles:
with open(fname, 'rt') as inf:
out.update(json.load(inf))
with open('out.json', 'wt') as outf:
json.dump(out, outf, indent=2)
CODE
>>>
output {
Map[String,Map[String,String]] merged = read_json('out.json')
}
runtime {
docker: "python:slim"
memory: "1 GB"
cpu: 1
disks: "local-disk 20 HDD"
dx_instance_type: "mem1_ssd1_v2_x2"
}
}
2 changes: 1 addition & 1 deletion pipes/WDL/tasks/tasks_interhost.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ task index_ref {
File? novocraft_license

Int? machine_mem_gb
String docker="quay.io/broadinstitute/viral-core:2.1.16"
String docker="quay.io/broadinstitute/viral-core:2.1.18"
}

command {
Expand Down
Loading

0 comments on commit 799e054

Please sign in to comment.