Skip to content

Commit

Permalink
Add an option to mask bases below a specified quality threshold (#716)
Browse files Browse the repository at this point in the history
* Added --quality-threshold to specify a threshold to use for masking bases. Bases with a quality score below the threshold are replaced with N's
  • Loading branch information
stromka authored Sep 27, 2021
1 parent efb058e commit ee1a976
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 93 deletions.
128 changes: 82 additions & 46 deletions src/main/scala/com/fulcrumgenomics/fastq/DemuxFastqs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package com.fulcrumgenomics.fastq

import java.io.Closeable
import java.util.concurrent.ForkJoinPool

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
Expand All @@ -38,6 +37,7 @@ import com.fulcrumgenomics.fastq.FastqDemultiplexer.{DemuxRecord, DemuxResult}
import com.fulcrumgenomics.illumina.{Sample, SampleSheet}
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.umi.ConsensusTags
import com.fulcrumgenomics.util.NumericTypes.PhredScore
import com.fulcrumgenomics.util.ReadStructure.SubRead
import com.fulcrumgenomics.util.{ReadStructure, SampleBarcodeMetric, _}
import htsjdk.samtools.SAMFileHeader.SortOrder
Expand Down Expand Up @@ -144,28 +144,40 @@ object DemuxFastqs {
threads: Int,
batchSize: Int = DemuxBatchRecordsSize,
omitFailingReads: Boolean,
omitControlReads: Boolean): Iterator[DemuxResult] = {
omitControlReads: Boolean,
threshold: Int,
qualityEncoding: QualityEncoding): Iterator[DemuxResult] = {

require(demultiplexer.expectedNumberOfReads == sources.length,
s"The demultiplexer expects ${demultiplexer.expectedNumberOfReads} reads but ${sources.length} FASTQ sources given.")

val zippedIterator = FastqSource.zipped(sources)

if (threshold + qualityEncoding.asciiOffset > Byte.MaxValue)
throw new IllegalArgumentException(f"Quality threshold $threshold exceeds max possible value")
val thresh = (threshold + qualityEncoding.asciiOffset).toByte

val resultIterator = if (threads > 1) {
// Developer Note: Iterator does not support parallel operations, so we need to group together records into a
// [[List]] or [[Seq]]. A fixed number of records are grouped to reduce memory overhead.
val pool = new ForkJoinPool(threads)
zippedIterator
.grouped(batchSize)
.flatMap { batch =>
batch
.parWith(pool=pool)
.map { readRecords => demultiplexer.demultiplex(readRecords: _*) }
.seq
}
}
else {
zippedIterator.map { readRecords => demultiplexer.demultiplex(readRecords: _*) }
}
// Developer Note: Iterator does not support parallel operations, so we need to group together records into a
// [[List]] or [[Seq]]. A fixed number of records are grouped to reduce memory overhead.
val pool = new ForkJoinPool(threads)
zippedIterator
.grouped(batchSize)
.flatMap { batch =>
batch
.parWith(pool = pool)
.map { readRecords =>
demultiplexer.demultiplex(readRecords: _*)
.maskLowQualityBases(threshold = thresh, qualityEncoding = qualityEncoding)
}
.seq
}
}
else {
zippedIterator
.map { readRecords => demultiplexer.demultiplex(readRecords: _*) }
.map { result => result.maskLowQualityBases(threshold = thresh, qualityEncoding = qualityEncoding) }
}

resultIterator.filter(r => r.keep(omitFailingReads, omitControlReads))
}
Expand Down Expand Up @@ -351,7 +363,8 @@ class DemuxFastqs
@arg(doc="Keep only passing filter reads if true, otherwise keep all reads. Passing filter reads are determined from the comment in the FASTQ header.")
val omitFailingReads: Boolean = false,
@arg(doc="Do not keep reads identified as control if true, otherwise keep all reads. Control reads are determined from the comment in the FASTQ header.")
val omitControlReads: Boolean = false
val omitControlReads: Boolean = false,
@arg(doc="Mask bases with a quality score below the specified threshold as Ns") val qualityThreshold: Int = 0,
) extends FgBioTool with LazyLogging {

// Support the deprecated --illumina-standards option
Expand Down Expand Up @@ -454,7 +467,9 @@ class DemuxFastqs
demultiplexer = demultiplexer,
threads = threads,
omitFailingReads = this.omitFailingReads,
omitControlReads = this.omitControlReads
omitControlReads = this.omitControlReads,
threshold = qualityThreshold,
qualityEncoding = qualityEncoding
)

// Write the records out in its own thread
Expand Down Expand Up @@ -567,7 +582,7 @@ private[fastq] object FastqRecordWriter {
/** A writer that writes [[DemuxRecord]]s as [[FastqRecord]]s. */
private[fastq] class FastqRecordWriter(prefix: PathPrefix, val pairedEnd: Boolean, val fastqStandards: FastqStandards) extends DemuxWriter[FastqRecord] {
private val writers: IndexedSeq[FastqWriter] = {
FastqRecordWriter.extensions(pairedEnd=pairedEnd, illuminaFileNames=fastqStandards.illuminaFileNames).map { ext =>
FastqRecordWriter.extensions(pairedEnd = pairedEnd, illuminaFileNames = fastqStandards.illuminaFileNames).map { ext =>
FastqWriter(Io.toWriter(PathUtil.pathTo(s"$prefix$ext")))
}.toIndexedSeq
}
Expand All @@ -580,29 +595,27 @@ private[fastq] class FastqRecordWriter(prefix: PathPrefix, val pairedEnd: Boolea

// update the comment
val comment = rec.readInfo match {
case None => rec.comment // when not updating sample barcodes, fetch the comment from the record
case None => rec.comment // when not updating sample barcodes, fetch the comment from the record
case Some(info) =>
if (fastqStandards.includeSampleBarcodes && rec.sampleBarcode.nonEmpty) {
Some(info.copy(sampleInfo=rec.sampleBarcode.mkString("+")).toString) // update the barcode in the record's header. In case of dual-indexing, barcodes are combined with a '+'.
Some(info.copy(sampleInfo = rec.sampleBarcode.mkString("+")).toString) // update the barcode in the record's header. In case of dual-indexing, barcodes are combined with a '+'.
}
else Some(info.toString)
}
val record = FastqRecord(
name = name,
bases = rec.originalBases.getOrElse(rec.bases),
quals = rec.originalQuals.getOrElse(rec.quals),
comment = comment,
name = name,
bases = rec.originalBases.getOrElse(rec.bases),
quals = rec.originalQuals.getOrElse(rec.quals),
comment = comment,
readNumber = if (fastqStandards.includeReadNumbers) Some(rec.readNumber) else None
)

val writer = this.writers.lift(rec.readNumber-1).getOrElse {
val writer = this.writers.lift(rec.readNumber - 1).getOrElse {
throw new IllegalStateException(s"Read number was invalid: ${rec.readNumber}")
}
writer.write(record)
record
}


override def close(): Unit = this.writers.foreach(_.close())
}

Expand All @@ -629,7 +642,23 @@ private[fastq] object FastqDemultiplexer {
comment: Option[String],
originalBases: Option[String] = None,
originalQuals: Option[String] = None,
readInfo: Option[ReadInfo] = None)
readInfo: Option[ReadInfo] = None) {

/** Masks bases that have a quality score less than or equal to a specified threshold.
*
* @param threshold The threshold for masking bases, exlusive. Bases with a quality score < threshold will be masked
* Here the threshold has been converted to a Byte directly comparable with the quality scores.
* @param qualityEncoding The encoding used for quality scores in the Fastq file.
* @return a new DemuxRecord with updated bases
*/
def maskLowQualityBases(threshold: Byte, qualityEncoding: QualityEncoding): DemuxRecord = {
if (threshold <= 0 || this.quals.forall(_ >= threshold)) this else {
val bases = this.bases.toCharArray
for (i <- Range(0, this.quals.length)) if (quals.charAt(i).toByte < threshold) bases(i) = 'N'
this.copy(bases = bases.mkString)
}
}
}

/** A class to store the [[SampleInfo]] and associated demultiplexed [[DemuxRecord]]s.
* @param sampleInfo the [[SampleInfo]] for the matched sample.
Expand All @@ -646,12 +675,20 @@ private[fastq] object FastqDemultiplexer {
*
* @param omitFailingReads true to keep only passing reads, false to keep all reads
*/

def keep(omitFailingReads: Boolean, omitControlReads: Boolean): Boolean = {
val keepByQc = if (!omitFailingReads) true else passQc
val keepByQc = if (!omitFailingReads) true else passQc
val keepByControl = if (!omitControlReads) true else !isControl
keepByQc && keepByControl
}

/** A function to mask bases that have a quality score less than or equal to a specified threshold
* @param threshold The threshold for masking bases, exclusive. Bases with a quality score < threshold will be masked
* @return a new DemuxResult with updated bases
*/
def maskLowQualityBases(threshold: Byte, qualityEncoding: QualityEncoding): DemuxResult = {
if (threshold <= 0) this
else { this.copy(records = records.map(_.maskLowQualityBases(threshold=threshold, qualityEncoding=qualityEncoding))) }
}
}

/** Counts the nucleotide mismatches between two strings of the same length. Ignores no calls in expectedBases. */
Expand Down Expand Up @@ -782,32 +819,31 @@ private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo],

// Get the molecular and sample barcodes
val molecularBarcode = bases(SegmentType.MolecularBarcode)
val sampleBarcode = bases(SegmentType.SampleBarcode)
val sampleBarcode = bases(SegmentType.SampleBarcode)

val demuxRecords = reads.zip(this.variableReadStructures)
.filter { case (_, rs) => rs.exists(_.kind == SegmentType.Template) }
.zipWithIndex
.map { case ((read, rs), readIndex) =>
val segments = rs.extract(read.bases, read.quals)
val segments = rs.extract(read.bases, read.quals)
val readNumber = readIndex + 1
val templates = segments.filter(_.kind == SegmentType.Template)
val templates = segments.filter(_.kind == SegmentType.Template)
require(templates.nonEmpty, s"Bug: require at least one template in read $readIndex; read structure was ${segments.mkString}")
DemuxRecord(
name = read.name,
bases = templates.map(_.bases).mkString,
quals = templates.map(_.quals).mkString,
name = read.name,
bases = templates.map(_.bases).mkString,
quals = templates.map(_.quals).mkString,
molecularBarcode = molecularBarcode,
sampleBarcode = sampleBarcode,
readNumber = readNumber,
pairedEnd = this.pairedEnd,
comment = read.comment,
originalBases = if (this.includeOriginal) Some(read.bases) else None,
originalQuals = if (this.includeOriginal) Some(read.quals) else None,
readInfo = fastqStandards.readInfo(read)
sampleBarcode = sampleBarcode,
readNumber = readNumber,
pairedEnd = this.pairedEnd,
comment = read.comment,
originalBases = if (this.includeOriginal) Some(read.bases) else None,
originalQuals = if (this.includeOriginal) Some(read.quals) else None,
readInfo = fastqStandards.readInfo(read)
)
}


val passQc = demuxRecords.forall(d => d.readInfo.forall(_.passQc))
val isControl = demuxRecords.forall(d => d.readInfo.forall(_.internalControl))
DemuxResult(sampleInfo=sampleInfo, numMismatches=numMismatches, records=demuxRecords, passQc=passQc, isControl=isControl)
Expand Down
Loading

0 comments on commit ee1a976

Please sign in to comment.