Skip to content

Commit

Permalink
adding mash dist
Browse files Browse the repository at this point in the history
  • Loading branch information
erinyoung committed Apr 30, 2024
1 parent f7b4f59 commit 1ac3e60
Showing 1 changed file with 133 additions and 30 deletions.
163 changes: 133 additions & 30 deletions donut_falls.nf
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ process busco {
tag "${meta.id}"
label "process_medium"
publishDir "${params.outdir}/${meta.id}", mode: 'copy', saveAs: { filename -> filename.equals('versions.yml') ? null : filename }
container 'staphb/busco:5.6.1-prok-bacteria_odb10_2024-01-08'
container 'staphb/busco:5.7.1-prok-bacteria_odb10_2024-01-08'
errorStrategy { task.attempt < 2 ? 'retry' : 'ignore'}
time '45m'

Expand Down Expand Up @@ -625,6 +625,57 @@ process gfa_to_fasta {
"""
}

// mash results : Reference-ID, Query-ID, Mash-distance, P-value, and Matching-hashes
process mash {
tag "${meta.id}"
label "process_medium"
publishDir "${params.outdir}/${meta.id}", mode: 'copy', saveAs: { filename -> filename.equals('versions.yml') ? null : filename }
container 'staphb/mash:2.3'
errorStrategy { task.attempt < 2 ? 'retry' : 'ignore'}
time '30m'

input:
tuple val(meta), file(illumina), file(nanopore)

output:
tuple val(meta), env(dist), optional: true, emit: dist
path "mash/*", optional: true, emit: txt
path "versions.yml", emit: versions

when:
task.ext.when == null || task.ext.when

shell:
def args = task.ext.args ?: ''
def ont_args = task.ext.ont_args ?: '-m 2 -c 20'
def ill_args = task.ext.ill_args ?: '-m 2 -c 20'
def short_re = "${illumina.join(' ')}"
def prefix = task.ext.prefix ?: "${meta.id}"
"""
mkdir mash
cat ${short_re} | \
mash sketch ${ill_args} \
-o ${prefix}.illumina -
mash sketch ${ont_args} \
-o ${prefix}.nanopore ${nanopore}
mash dist ${args} \
-p ${task.cpus} \
${prefix}.illumina.msh \
${prefix}.nanopore.msh \
> mash/${prefix}.mashdist.txt
dist=\$(head -n 1 mash/${prefix}.mashdist.txt | awk '{print \$3}')
cat <<-END_VERSIONS > versions.yml
"${task.process}":
mash: \$( mash --version )
END_VERSIONS
"""
}

// From https://github.com/nanoporetech/medaka
// > It is not recommended to specify a value of --threads greater than 2 for medaka consensus since the compute scaling efficiency is poor beyond this.
// > Note also that medaka consensus may been seen to use resources equivalent to <threads> + 4 as an additional 4 threads are used for reading and preparing input data.
Expand Down Expand Up @@ -866,7 +917,7 @@ process nanoplot_summary {
label "process_low"
publishDir "${params.outdir}/summary", mode: 'copy', saveAs: { filename -> filename.equals('versions.yml') ? null : filename }
container 'staphb/nanoplot:1.42.0'
//errorStrategy { task.attempt < 2 ? 'retry' : 'ignore'}
errorStrategy { task.attempt < 2 ? 'retry' : 'ignore'}
time '30m'

input:
Expand Down Expand Up @@ -1038,7 +1089,7 @@ process pypolca {
cat <<-END_VERSIONS > versions.yml
"${task.process}":
pypolca: \$(pypolca --version | awk '{print NF}')
pypolca: \$(pypolca --version | head -n 1 | awk '{print NF}')
END_VERSIONS
"""
}
Expand Down Expand Up @@ -1130,6 +1181,7 @@ process summary {

output:
path "donut_falls_summary.json", emit: summary
path "donut_falls_summary.tsv", emit: csv

when:
task.ext.when == null || task.ext.when
Expand Down Expand Up @@ -1166,12 +1218,36 @@ process summary {
with open('donut_falls_summary.json', 'w') as json_file:
json.dump(dict, json_file, indent=4)
with open('donut_falls_summary.tsv', 'w') as tsv_file:
i = 0
sorted_keys = sorted(dict.keys())
for sample in sorted_keys:
w = csv.DictWriter(tsv_file, dict[sample].keys(), delimiter='\\t')
if i < 1 :
w.writeheader()
i = i+1
w.writerow(dict[sample])
def mash_file(file):
dict = {}
with open(file, mode = 'r') as file:
reader = csv.DictReader(file, delimiter='\\t', fieldnames=['illumina','nanopore', 'dist', 'pvalue', 'hash'])
for row in reader:
key = row['nanopore'].replace('.fastq.gz', '')
dict[key] = row
return dict
def main():
if exists('nanoplot_summary.csv') :
nanoplot_dict = file_to_dict('nanoplot_summary.csv', 'sample', ',')
else:
nanoplot_dict = {}
if exists('mash_summary.tsv') :
mash_dict = mash_file('mash_summary.tsv')
else:
mash_dict = {}
if exists('pypolca_summary.tsv') :
pypolca_dict = file_to_dict('pypolca_summary.tsv', 'sample', '\t')
else:
Expand Down Expand Up @@ -1222,9 +1298,16 @@ process summary {
final_results[key]['number_of_reads'] = nanoplot_dict[key]['number_of_reads']
final_results[key]['mean_read_length'] = nanoplot_dict[key]['mean_read_length']
final_results[key]['mean_qual'] = nanoplot_dict[key]['mean_qual']
# from mash
if key in mash_dict.keys():
final_results[key]['nanopore_illumina_mash_distance'] = mash_dict[key]['dist']
else:
final_results[key]['nanopore_illumina_mash_distance'] = 0
for assembler in assemblers:
if key + "_" + assembler in gfastats_dict.keys():
final_results[key][assembler] = {}
final_results[key]["assembler"] = "${params.assembler}"
# gfastats results
total_length = 0
Expand All @@ -1234,34 +1317,34 @@ process summary {
if gfastats_dict[key + "_" + assembler][contig]["circular"] == "Y":
num_circular = num_circular + 1
final_results[key][assembler]['total_length'] = total_length
final_results[key][assembler]['num_contigs'] = len(gfastats_dict[key + "_" + assembler].keys())
final_results[key][assembler]['circ_contigs'] = num_circular
final_results[key][assembler + '_total_length'] = total_length
final_results[key][assembler + '_num_contigs'] = len(gfastats_dict[key + "_" + assembler].keys())
final_results[key][assembler + '_circ_contigs'] = num_circular
# circulocov results
if key + "_" + assembler in circulocov_dict.keys():
final_results[key][assembler]['coverage'] = circulocov_dict[key + '_' + assembler]['coverage']
final_results[key][assembler]['unmapped_nanopore'] = circulocov_dict[key + '_' + assembler]['unmapped_nanopore']
final_results[key][assembler]['unmapped_illumina'] = circulocov_dict[key + '_' + assembler]['unmapped_illumina']
final_results[key][assembler + '_coverage'] = circulocov_dict[key + '_' + assembler]['coverage']
final_results[key][assembler + '_unmapped_nanopore'] = circulocov_dict[key + '_' + assembler]['unmapped_nanopore']
final_results[key][assembler + '_unmapped_illumina'] = circulocov_dict[key + '_' + assembler]['unmapped_illumina']
# busco results
if key + "_" + assembler in busco_dict.keys():
final_results[key][assembler]['busco'] = busco_dict[key + "_" + assembler]
final_results[key][assembler + '_busco'] = busco_dict[key + "_" + assembler]
if key + "_" + assembler + '_reoriented' in busco_dict.keys():
final_results[key][assembler]['busco'] = busco_dict[key + "_" + assembler + '_reoriented']
final_results[key][assembler + '_busco'] = busco_dict[key + "_" + assembler + '_reoriented']
for step in ['polypolish', 'pypolca', 'medaka']:
if key + "_" + assembler + '_' + step in busco_dict.keys():
final_results[key][assembler]['busco_' + step ] = busco_dict[key + "_" + assembler + '_' + step]
final_results[key][assembler + '_busco_' + step ] = busco_dict[key + "_" + assembler + '_' + step]
else:
final_results[key][assembler]['busco_' + step ] = 'NF'
final_results[key][assembler + '_busco_' + step ] = 'NF'
# pypolca results
if key + "_" + assembler in pypolca_dict.keys():
final_results[key][assembler]['Consensus_Quality_Before_Polishing'] = pypolca_dict[key + "_" + assembler]['Consensus_Quality_Before_Polishing']
final_results[key][assembler]['Consensus_QV_Before_Polishing'] = pypolca_dict[key + "_" + assembler]['Consensus_QV_Before_Polishing']
final_results[key][assembler + '_Consensus_Quality_Before_Polishing'] = pypolca_dict[key + "_" + assembler]['Consensus_Quality_Before_Polishing']
final_results[key][assembler + '_Consensus_QV_Before_Polishing'] = pypolca_dict[key + "_" + assembler]['Consensus_QV_Before_Polishing']
else:
final_results[key][assembler]['Consensus_Quality_Before_Polishing'] = 0
final_results[key][assembler]['Consensus_QV_Before_Polishing'] = 0
final_results[key][assembler + '_Consensus_Quality_Before_Polishing'] = 0
final_results[key][assembler + '_Consensus_QV_Before_Polishing'] = 0
final_file(final_results)
Expand Down Expand Up @@ -1319,7 +1402,7 @@ process versions {
label "process_single"
publishDir "${params.outdir}/summary", mode: 'copy', saveAs: { filename -> filename.equals('versions.yml') ? null : filename }
container 'staphb/multiqc:1.19'
time '10m'
time '30m'
errorStrategy { task.attempt < 2 ? 'retry' : 'ignore'}

input:
Expand Down Expand Up @@ -1465,8 +1548,28 @@ workflow DONUT_FALLS {
// versions channel
ch_versions = Channel.empty()

mash(ch_illumina_input.join(ch_nanopore_input, by: 0, remainder: false))

ch_versions = ch_versions.mix(mash.out.versions)

mash.out.txt
.collectFile(
storeDir: "${params.outdir}/summary/",
keepHeader: false,
sort: { file -> file.text },
name: "mash_summary.tsv")
.set { mash_summary }

ch_summary = ch_summary.mix(mash_summary)

ch_illumina_input
.join(mash.out.dist, by: 0)
.filter{it[2] as float < 0.01}
.map{it -> tuple(it[0], it[1])}
.set {ch_dist_filter}

if (params.assembler =~ /unicycler/ ) {
unicycler(ch_illumina_input.join(ch_nanopore_input, by: 0, remainder: false))
unicycler(ch_dist_filter.join(ch_nanopore_input, by: 0, remainder: false))

ch_gfa = ch_gfa.mix(unicycler.out.gfa)
// no ch_summary
Expand All @@ -1476,7 +1579,7 @@ workflow DONUT_FALLS {

if (params.assembler.replaceAll('dragonflye','dragon') =~ /flye/ || params.assembler =~ /raven/ ) {
// quality filter
ch_illumina_input
ch_dist_filter
.map { it -> [it[0], it[1], "illumina"]}
.mix(ch_nanopore_input.map { it -> [it[0], it[1], "nanopore"]})
.filter{it[0]}
Expand Down Expand Up @@ -1573,8 +1676,8 @@ workflow DONUT_FALLS {
.set { ch_medaka_out }

ch_medaka_out.flye
.join(ch_illumina_input, by:0, remainder: false)
.mix(ch_medaka_out.raven.join(ch_illumina_input, by:0, remainder: false))
.join(ch_dist_filter, by:0, remainder: false)
.mix(ch_medaka_out.raven.join(ch_dist_filter, by:0, remainder: false))
.set { ch_medaka_polished }

bwa(ch_medaka_polished)
Expand All @@ -1591,8 +1694,8 @@ workflow DONUT_FALLS {
.set { ch_polypolish_out }

ch_polypolish_out.flye
.join(ch_illumina_input, by:0, remainder: false)
.mix(ch_polypolish_out.raven.join(ch_illumina_input, by:0, remainder: false))
.join(ch_dist_filter, by:0, remainder: false)
.mix(ch_polypolish_out.raven.join(ch_dist_filter, by:0, remainder: false))
.set { ch_polypolish_polished }

pypolca(ch_polypolish_polished)
Expand Down Expand Up @@ -1624,7 +1727,7 @@ workflow DONUT_FALLS {
busco(ch_consensus)

ch_summary = ch_summary.mix(busco.out.summary)
ch_versions = ch_versions.mix(busco.out.versions)
ch_versions = ch_versions.mix(busco.out.versions.first())

ch_consensus
.filter{ it -> !(it[1] =~ /pypolca/ )}
Expand All @@ -1639,10 +1742,10 @@ workflow DONUT_FALLS {
.set { ch_assemblies }

ch_assemblies.dragonflye
.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_illumina_input, by: 0, remainder: true)
.mix(ch_assemblies.flye.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_illumina_input, by: 0, remainder: true))
.mix(ch_assemblies.unicycler.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_illumina_input, by: 0, remainder: true))
.mix(ch_assemblies.raven.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_illumina_input, by: 0, remainder: true))
.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_dist_filter, by: 0, remainder: true)
.mix(ch_assemblies.flye.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_dist_filter, by: 0, remainder: true))
.mix(ch_assemblies.unicycler.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_dist_filter, by: 0, remainder: true))
.mix(ch_assemblies.raven.join(ch_nanopore_input, by: 0 , remainder: false).join(ch_dist_filter, by: 0, remainder: true))
.filter{ it -> if (it) {it[1]}}
.set{ch_assembly_reads}

Expand Down

0 comments on commit 1ac3e60

Please sign in to comment.