Skip to content

Commit

Permalink
Move assembly-input logic back into shortread subworkflow. Make sure …
Browse files Browse the repository at this point in the history
…to always declare the ch_short_reads_assembly even if assembly-input
  • Loading branch information
muabnezor committed Nov 25, 2024
1 parent 9273717 commit 16ba808
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 130 deletions.
235 changes: 122 additions & 113 deletions subworkflows/local/shortread_preprocessing.nf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ workflow SHORTREAD_PREPROCESSING {
main:
ch_versions = Channel.empty()
ch_multiqc_files = Channel.empty()
ch_short_reads_assembly = Channel.empty()

FASTQC_RAW(
ch_raw_short_reads
Expand All @@ -33,142 +34,150 @@ workflow SHORTREAD_PREPROCESSING {
ch_multiqc_files = ch_multiqc_files.mix(FASTQC_RAW.out.zip)

ch_bowtie2_removal_host_multiqc = Channel.empty()
if (!params.skip_clipping) {
if (params.clip_tool == 'fastp') {
FASTP(
ch_raw_short_reads,
[],
params.fastp_save_trimmed_fail,
[]
)
ch_short_reads_prepped = FASTP.out.reads
ch_versions = ch_versions.mix(FASTP.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(FASTP.out.json)

}
else if (params.clip_tool == 'adapterremoval') {
if (!params.assembly_input) {
if (!params.skip_clipping) {
if (params.clip_tool == 'fastp') {
FASTP(
ch_raw_short_reads,
[],
params.fastp_save_trimmed_fail,
[]
)
ch_short_reads_prepped = FASTP.out.reads
ch_versions = ch_versions.mix(FASTP.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(FASTP.out.json)

// due to strange output file scheme in AR2, have to manually separate
// SE/PE to allow correct pulling of reads after.
ch_adapterremoval_in = ch_raw_short_reads.branch {
single: it[0]['single_end']
paired: !it[0]['single_end']
}
else if (params.clip_tool == 'adapterremoval') {

ADAPTERREMOVAL_PE(ch_adapterremoval_in.paired, [])
ADAPTERREMOVAL_SE(ch_adapterremoval_in.single, [])
// due to strange output file scheme in AR2, have to manually separate
// SE/PE to allow correct pulling of reads after.
ch_adapterremoval_in = ch_raw_short_reads.branch {
single: it[0]['single_end']
paired: !it[0]['single_end']
}

ch_short_reads_prepped = Channel.empty()
ch_short_reads_prepped = ch_short_reads_prepped.mix(ADAPTERREMOVAL_SE.out.singles_truncated, ADAPTERREMOVAL_PE.out.paired_truncated)
ADAPTERREMOVAL_PE(ch_adapterremoval_in.paired, [])
ADAPTERREMOVAL_SE(ch_adapterremoval_in.single, [])

ch_versions = ch_versions.mix(ADAPTERREMOVAL_PE.out.versions.first(), ADAPTERREMOVAL_SE.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(ADAPTERREMOVAL_PE.out.settings)
ch_multiqc_files = ch_multiqc_files.mix(ADAPTERREMOVAL_SE.out.settings)
ch_short_reads_prepped = Channel.empty()
ch_short_reads_prepped = ch_short_reads_prepped.mix(ADAPTERREMOVAL_SE.out.singles_truncated, ADAPTERREMOVAL_PE.out.paired_truncated)

ch_versions = ch_versions.mix(ADAPTERREMOVAL_PE.out.versions.first(), ADAPTERREMOVAL_SE.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(ADAPTERREMOVAL_PE.out.settings)
ch_multiqc_files = ch_multiqc_files.mix(ADAPTERREMOVAL_SE.out.settings)
}
}
else {
ch_short_reads_prepped = ch_raw_short_reads
}

if (params.host_fasta) {
if (params.host_fasta_bowtie2index) {
ch_host_bowtie2index = file(params.host_fasta_bowtie2index, checkIfExists: true)
}
else {
BOWTIE2_HOST_REMOVAL_BUILD(
ch_host_fasta
)
ch_host_bowtie2index = BOWTIE2_HOST_REMOVAL_BUILD.out.index
}
}
}
else {
ch_short_reads_prepped = ch_raw_short_reads
}

if (params.host_fasta) {
if (params.host_fasta_bowtie2index) {
ch_host_bowtie2index = file(params.host_fasta_bowtie2index, checkIfExists: true)
if (params.host_fasta || params.host_genome) {
BOWTIE2_HOST_REMOVAL_ALIGN(
ch_short_reads_prepped,
ch_host_bowtie2index
)
ch_short_reads_hostremoved = BOWTIE2_HOST_REMOVAL_ALIGN.out.reads
ch_versions = ch_versions.mix(BOWTIE2_HOST_REMOVAL_ALIGN.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(BOWTIE2_HOST_REMOVAL_ALIGN.out.log)
}
else {
BOWTIE2_HOST_REMOVAL_BUILD(
ch_host_fasta
ch_short_reads_hostremoved = ch_short_reads_prepped
}

if (!params.keep_phix) {
BOWTIE2_PHIX_REMOVAL_BUILD(
ch_phix_db_file
)
ch_host_bowtie2index = BOWTIE2_HOST_REMOVAL_BUILD.out.index
BOWTIE2_PHIX_REMOVAL_ALIGN(
ch_short_reads_hostremoved,
BOWTIE2_PHIX_REMOVAL_BUILD.out.index
)
ch_short_reads_phixremoved = BOWTIE2_PHIX_REMOVAL_ALIGN.out.reads
ch_versions = ch_versions.mix(BOWTIE2_PHIX_REMOVAL_ALIGN.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(BOWTIE2_PHIX_REMOVAL_ALIGN.out.log)
}
else {
ch_short_reads_phixremoved = ch_short_reads_hostremoved
}
}

if (params.host_fasta || params.host_genome) {
BOWTIE2_HOST_REMOVAL_ALIGN(
ch_short_reads_prepped,
ch_host_bowtie2index
)
ch_short_reads_hostremoved = BOWTIE2_HOST_REMOVAL_ALIGN.out.reads
ch_versions = ch_versions.mix(BOWTIE2_HOST_REMOVAL_ALIGN.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(BOWTIE2_HOST_REMOVAL_ALIGN.out.log)
}
else {
ch_short_reads_hostremoved = ch_short_reads_prepped
}
if (!(params.keep_phix && params.skip_clipping && !(params.host_genome || params.host_fasta))) {
FASTQC_TRIMMED(
ch_short_reads_phixremoved
)
ch_versions = ch_versions.mix(FASTQC_TRIMMED.out.versions)
ch_multiqc_files = ch_multiqc_files.mix(FASTQC_TRIMMED.out.zip)
}

if (!params.keep_phix) {
BOWTIE2_PHIX_REMOVAL_BUILD(
ch_phix_db_file
)
BOWTIE2_PHIX_REMOVAL_ALIGN(
ch_short_reads_hostremoved,
BOWTIE2_PHIX_REMOVAL_BUILD.out.index
)
ch_short_reads_phixremoved = BOWTIE2_PHIX_REMOVAL_ALIGN.out.reads
ch_versions = ch_versions.mix(BOWTIE2_PHIX_REMOVAL_ALIGN.out.versions.first())
ch_multiqc_files = ch_multiqc_files.mix(BOWTIE2_PHIX_REMOVAL_ALIGN.out.log)
}
else {
ch_short_reads_phixremoved = ch_short_reads_hostremoved
}
// Run/Lane merging

if (!(params.keep_phix && params.skip_clipping && !(params.host_genome || params.host_fasta))) {
FASTQC_TRIMMED(
ch_short_reads_phixremoved
)
ch_versions = ch_versions.mix(FASTQC_TRIMMED.out.versions)
ch_multiqc_files = ch_multiqc_files.mix(FASTQC_TRIMMED.out.zip)
}
ch_short_reads_forcat = ch_short_reads_phixremoved
.map { meta, reads ->
def meta_new = meta - meta.subMap('run')
[meta_new, reads]
}
.groupTuple()
.branch { meta, reads ->
cat: reads.size() >= 2
skip_cat: true
}

// Run/Lane merging
CAT_FASTQ(ch_short_reads_forcat.cat.map { meta, reads -> [meta, reads.flatten()] })

ch_short_reads_forcat = ch_short_reads_phixremoved
.map { meta, reads ->
def meta_new = meta - meta.subMap('run')
[meta_new, reads]
}
.groupTuple()
.branch { meta, reads ->
cat: reads.size() >= 2
skip_cat: true
// Ensure we don't have nests of nests so that structure is in form expected for assembly
ch_short_reads_catskipped = ch_short_reads_forcat.skip_cat.map { meta, reads ->
def new_reads = meta.single_end ? reads[0] : reads.flatten()
[meta, new_reads]
}

CAT_FASTQ(ch_short_reads_forcat.cat.map { meta, reads -> [meta, reads.flatten()] })

// Ensure we don't have nests of nests so that structure is in form expected for assembly
ch_short_reads_catskipped = ch_short_reads_forcat.skip_cat.map { meta, reads ->
def new_reads = meta.single_end ? reads[0] : reads.flatten()
[meta, new_reads]
}

// Combine single run and multi-run-merged data
ch_short_reads = Channel.empty()
ch_short_reads = CAT_FASTQ.out.reads.mix(ch_short_reads_catskipped)
ch_versions = ch_versions.mix(CAT_FASTQ.out.versions.first())

if (params.bbnorm) {
if (params.coassemble_group) {
// Interleave pairs, to be able to treat them as single ends when calling bbnorm. This prepares
// for dropping the single_end parameter, but keeps assembly modules as they are, i.e. not
// accepting a mix of single end and pairs.
SEQTK_MERGEPE(
ch_short_reads.filter { !it[0].single_end }
)
ch_versions = ch_versions.mix(SEQTK_MERGEPE.out.versions.first())
// Combine the interleaved pairs with any single end libraries. Set the meta.single_end to true (used by the bbnorm module).
ch_bbnorm = SEQTK_MERGEPE.out.reads
.mix(ch_short_reads.filter { it[0].single_end })
.map { [[id: sprintf("group%s", it[0].group), group: it[0].group, single_end: true], it[1]] }
.groupTuple()
// Combine single run and multi-run-merged data
ch_short_reads = Channel.empty()
ch_short_reads = CAT_FASTQ.out.reads.mix(ch_short_reads_catskipped)
ch_versions = ch_versions.mix(CAT_FASTQ.out.versions.first())

if (params.bbnorm) {
if (params.coassemble_group) {
// Interleave pairs, to be able to treat them as single ends when calling bbnorm. This prepares
// for dropping the single_end parameter, but keeps assembly modules as they are, i.e. not
// accepting a mix of single end and pairs.
SEQTK_MERGEPE(
ch_short_reads.filter { !it[0].single_end }
)
ch_versions = ch_versions.mix(SEQTK_MERGEPE.out.versions.first())
// Combine the interleaved pairs with any single end libraries. Set the meta.single_end to true (used by the bbnorm module).
ch_bbnorm = SEQTK_MERGEPE.out.reads
.mix(ch_short_reads.filter { it[0].single_end })
.map { [[id: sprintf("group%s", it[0].group), group: it[0].group, single_end: true], it[1]] }
.groupTuple()
}
else {
ch_bbnorm = ch_short_reads
}
BBMAP_BBNORM(ch_bbnorm)
ch_versions = ch_versions.mix(BBMAP_BBNORM.out.versions)
ch_short_reads_assembly = BBMAP_BBNORM.out.fastq
}
else {
ch_bbnorm = ch_short_reads
ch_short_reads_assembly = ch_short_reads
}
BBMAP_BBNORM(ch_bbnorm)
ch_versions = ch_versions.mix(BBMAP_BBNORM.out.versions)
ch_short_reads_assembly = BBMAP_BBNORM.out.fastq
}
else {
ch_short_reads_assembly = ch_short_reads
ch_short_reads = ch_raw_short_reads.map { meta, reads ->
def meta_new = meta - meta.subMap('run')
[meta_new, reads]
}
}

emit:
Expand Down
27 changes: 10 additions & 17 deletions workflows/mag.nf
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,17 @@ workflow MAG {
================================================================================
*/

if (!params.assembly_input) {
SHORTREAD_PREPROCESSING(
ch_raw_short_reads,
ch_host_fasta,
ch_phix_db_file,
ch_metaeuk_db
)
SHORTREAD_PREPROCESSING(
ch_raw_short_reads,
ch_host_fasta,
ch_phix_db_file,
ch_metaeuk_db
)

ch_versions = ch_versions.mix(SHORTREAD_PREPROCESSING.out.versions)
ch_short_reads = SHORTREAD_PREPROCESSING.out.short_reads
ch_short_reads_assembly = SHORTREAD_PREPROCESSING.out.short_reads_assembly

ch_versions = ch_versions.mix(SHORTREAD_PREPROCESSING.out.versions)
ch_short_reads = SHORTREAD_PREPROCESSING.out.short_reads
ch_short_reads_assembly = SHORTREAD_PREPROCESSING.out.short_reads_assembly
}
else {
ch_short_reads = ch_raw_short_reads.map { meta, reads ->
def meta_new = meta - meta.subMap('run')
[meta_new, reads]
}
}

/*
================================================================================
Expand Down

0 comments on commit 16ba808

Please sign in to comment.