Skip to content

Commit

Permalink
Adding option to filter on the internal control flag, and accompanyin…
Browse files Browse the repository at this point in the history
…g tests
  • Loading branch information
Kari Stromhaug committed Sep 24, 2021
1 parent 5e3de06 commit 4199efd
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 27 deletions.
48 changes: 32 additions & 16 deletions src/main/scala/com/fulcrumgenomics/fastq/DemuxFastqs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ object DemuxFastqs {
demultiplexer: FastqDemultiplexer,
threads: Int,
batchSize: Int = DemuxBatchRecordsSize,
skipFailingReads: Boolean): Iterator[DemuxResult] = {
skipFailingReads: Boolean,
omitControlReads: Boolean): Iterator[DemuxResult] = {

require(demultiplexer.expectedNumberOfReads == sources.length,
s"The demultiplexer expects ${demultiplexer.expectedNumberOfReads} reads but ${sources.length} FASTQ sources given.")
Expand All @@ -165,7 +166,8 @@ object DemuxFastqs {
else {
zippedIterator.map { readRecords => demultiplexer.demultiplex(readRecords: _*) }
}
resultIterator.filter(r => r.keep(skipFailingReads))

resultIterator.filter(r => r.keep(skipFailingReads, omitControlReads))
}
}

Expand Down Expand Up @@ -294,6 +296,8 @@ object DemuxFastqs {
|`--fastq-skip-read-numbers=true --fastq-include-sample-barcodes=true --illumina-file-names=true`
|
|By default all input reads are output. If your input FASTQs contain reads that do not pass filter (as defined by the Y/N filter flag in the FASTQ comment) these can be filtered out during demultiplexing using the `--skip-failing-reads` option.
|
|To output only reads that are not control reads, as encoded in the `<control number>` field in the header comment, use the `--omit-control-reads` flag
""",
group=ClpGroups.Fastq
)
Expand Down Expand Up @@ -345,7 +349,9 @@ class DemuxFastqs
@arg(doc="Name the output files according to the Illumina file name standards.", mutex=Array("illuminaStandards"))
var illuminaFileNames: Boolean = false,
@arg(doc="Keep only passing filter reads if true, otherwise keep all reads. Passing filter reads are determined from the comment in the FASTQ header.")
val skipFailingReads: Boolean = false
val skipFailingReads: Boolean = false,
@arg(doc="Keep only reads identified as control if true, otherwise keep all reads. Control reads are determined from the comment in the FASTQ header.")
val omitControlReads: Boolean = false
) extends FgBioTool with LazyLogging {

// Support the deprecated --illumina-standards option
Expand Down Expand Up @@ -434,7 +440,8 @@ class DemuxFastqs
maxNoCalls = maxNoCalls,
includeOriginal = this.includeAllBasesInFastqs,
fastqStandards = this.fastqStandards,
skipFailingReads = this.skipFailingReads
skipFailingReads = this.skipFailingReads,
omitControlReads = this.omitControlReads
)

val progress = ProgressLogger(this.logger, unit=1e6.toInt)
Expand All @@ -443,10 +450,11 @@ class DemuxFastqs
// DemuxRecord in parallel
val sources = inputs.map(FastqSource(_))
val iterator = demultiplexingIterator(
sources = sources,
demultiplexer = demultiplexer,
threads = threads,
skipFailingReads = this.skipFailingReads
sources = sources,
demultiplexer = demultiplexer,
threads = threads,
skipFailingReads = this.skipFailingReads,
omitControlReads = this.omitControlReads
)

// Write the records out in its own thread
Expand Down Expand Up @@ -619,22 +627,25 @@ private[fastq] object FastqDemultiplexer {
* @param numMismatches the # of mismatches if it matched a sample with a sample barcode. This will be [[Int.MaxValue]]
* for the unmatched sample.
* @param records the records, one for each read that has template bases.
* @param passQc flag noting if the read passes QC. Default true to keep all reads unless otherwise specified.
* @param isControl flag noting if the read is an internal control. Default false unless filtering to remove internal control reads.
*/
case class DemuxResult(sampleInfo: SampleInfo, numMismatches: Int, records: Seq[DemuxRecord], passQc: Boolean = true) {
case class DemuxResult(sampleInfo: SampleInfo, numMismatches: Int, records: Seq[DemuxRecord], passQc: Boolean = true, isControl: Boolean = false) {

/** Returns true if this [[DemuxResult]] should be kept for output, false otherwise.
*
* Returns true if `skipFailingReads` is false or if `passQc` is true
*
* @param skipFailingReads true to keep only passing reads, false to keep all reads
*/
def keep(skipFailingReads: Boolean = false): Boolean = {
if (!skipFailingReads) true else passQc

def keep(skipFailingReads: Boolean, omitControlReads: Boolean): Boolean = {
val keepByQc = if (!skipFailingReads) true else passQc
val keepByControl = if (!omitControlReads) true else !isControl
keepByQc && keepByControl
}
}



/** Counts the nucleotide mismatches between two strings of the same length. Ignores no calls in expectedBases. */
private[fastq] def countMismatches(observedBases: Array[Byte], expectedBases: Array[Byte]): Int = {
require(observedBases.length == expectedBases.length, s"observedBases: ${observedBases.length} expectedBases: ${expectedBases.length}")
Expand Down Expand Up @@ -669,6 +680,8 @@ private[fastq] object FastqDemultiplexer {
* @param includeOriginal true if to provide set the values for `originaBases` and `originalQuals` in [[DemuxResult]],
* namely the bases and qualities FOR ALL bases, including molecular barcode, sample barcode,
* and skipped bases.
* @param keepOnlyPassing true if to remove reads that don't pass QC, marked as 'N' in the header comment
* @param omitControlReads false if to keep reads that are marked as internal control reads in the header comment.
*/
private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo],
readStructures: Seq[ReadStructure],
Expand All @@ -678,7 +691,8 @@ private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo],
val minMismatchDelta: Int = 1,
val maxNoCalls: Int = 2,
val includeOriginal: Boolean = false,
val skipFailingReads: Boolean = false) {
val skipFailingReads: Boolean = false,
val omitControlReads: Boolean = false) {
import FastqDemultiplexer._

require(readStructures.nonEmpty, "No read structures were given")
Expand Down Expand Up @@ -785,8 +799,10 @@ private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo],
)
}


val passQc = demuxRecords.forall(d => d.readInfo.forall(_.passQc))
DemuxResult(sampleInfo=sampleInfo, numMismatches=numMismatches, records=demuxRecords, passQc=passQc)
val isControl = demuxRecords.forall(d => d.readInfo.forall(_.internalControl))
DemuxResult(sampleInfo=sampleInfo, numMismatches=numMismatches, records=demuxRecords, passQc=passQc, isControl=isControl)
}
}

Expand Down Expand Up @@ -850,7 +866,7 @@ object ReadInfo {
ReadInfo(
readNumber = readNumber.toInt,
passQc = keepBoolean,
internalControl = internalControl.toInt == 1,
internalControl = internalControl.toInt != 0,
sampleInfo = sampleInfo,
rest = commentFields.drop(1)
)
Expand Down
103 changes: 92 additions & 11 deletions src/test/scala/com/fulcrumgenomics/fastq/DemuxFastqsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,10 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
}

/** Helper method to create a [[FastqDemultiplexer]] */
private def dx(structures: Seq[ReadStructure], mm: Int = 2, md: Int = 1, mn: Int = 1, skipFailingReads: Boolean = false, fastqStandards: FastqStandards = FastqStandards()): FastqDemultiplexer = {

private def dx(structures: Seq[ReadStructure], mm: Int = 2, md: Int = 1, mn: Int = 1, skipFailingReads: Boolean = false, omitControlReads: Boolean = false, fastqStandards: FastqStandards = FastqStandards()): FastqDemultiplexer = {
new FastqDemultiplexer(sampleInfos=toSampleInfos(structures), readStructures=structures,
maxMismatches=mm, minMismatchDelta=md, maxNoCalls=mn, fastqStandards=fastqStandards, skipFailingReads=skipFailingReads)
maxMismatches=mm, minMismatchDelta=md, maxNoCalls=mn, fastqStandards=fastqStandards, skipFailingReads=skipFailingReads, omitControlReads=omitControlReads)
}

private def fq(name: String, bases: String, quals: Option[String]=None, comment: Option[String] = None, readNumber: Option[Int]=None): FastqRecord =
Expand Down Expand Up @@ -336,6 +337,38 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
demuxRecord.passQc shouldBe true
}

it should "set isControl to true when --omit-control-reads=true and the control field is nonzero" in {
val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=true, fastqStandards = FastqStandards(includeSampleBarcodes=true))
val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:1:SampleNumber therest"))

val demuxRecord = demuxer.demultiplex(fastqRecord)
demuxRecord.isControl shouldBe true
}

it should "set isControl to false when --omit-control-reads=true and the control field is zero" in {
val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=true, fastqStandards = FastqStandards(includeSampleBarcodes=true))
val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:0:SampleNumber therest"))

val demuxRecord = demuxer.demultiplex(fastqRecord)
demuxRecord.isControl shouldBe false
}

it should "set isControl to true when --omit-control-reads=false and the control field is nonzero" in {
val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=false, fastqStandards = FastqStandards(includeSampleBarcodes=true))
val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:1:SampleNumber therest"))

val demuxRecord = demuxer.demultiplex(fastqRecord)
demuxRecord.isControl shouldBe true
}

it should "set isControl to false when --omit-control-reads=false and the control field is zero" in {
val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=false, fastqStandards = FastqStandards(includeSampleBarcodes=true))
val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:0:SampleNumber therest"))

val demuxRecord = demuxer.demultiplex(fastqRecord)
demuxRecord.isControl shouldBe false
}


"FastqDemultiplexer.countMismatches" should "find no mismatches" in {
FastqDemultiplexer.countMismatches("GATTACA".getBytes, "GATTACA".getBytes) shouldBe 0
Expand All @@ -353,6 +386,47 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
FastqDemultiplexer.countMismatches("GATTACA".getBytes, "CTAATGT".getBytes) shouldBe 7
}

"DemuxResult.keep" should "correctly determine if a read should be kept" in {
val structures = Seq(ReadStructure("17B100T"))
val sampleInfos = toSampleInfos(structures)
val demuxRecord = DemuxRecord(name = "name", bases = "", quals = "", molecularBarcode = Seq("MB"), sampleBarcode = Seq("SB"), readNumber = 1, pairedEnd = false, comment = None)

{ // passes QC and is not an internal control
val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = true, isControl = false)
demuxResult.keep(skipFailingReads = false, omitControlReads = false) shouldBe true
demuxResult.keep(skipFailingReads = true, omitControlReads = false) shouldBe true
demuxResult.keep(skipFailingReads = false, omitControlReads = true) shouldBe true
demuxResult.keep(skipFailingReads = true, omitControlReads = true) shouldBe true
}

{ // does not pass QC and is not and internal control
val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = false, isControl = false)
demuxResult.keep(skipFailingReads = false, omitControlReads = false) shouldBe true
demuxResult.keep(skipFailingReads = true, omitControlReads = false) shouldBe false
demuxResult.keep(skipFailingReads = false, omitControlReads = true) shouldBe true
demuxResult.keep(skipFailingReads = true, omitControlReads = true) shouldBe false
}

{ // passes QC but is an internal control
val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = true, isControl = true)
demuxResult.keep(skipFailingReads = false, omitControlReads = false) shouldBe true
demuxResult.keep(skipFailingReads = true, omitControlReads = false) shouldBe true
demuxResult.keep(skipFailingReads = false, omitControlReads = true) shouldBe false
demuxResult.keep(skipFailingReads = true, omitControlReads = true) shouldBe false
}

{ // does not pass QC and is an internal control
val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = false, isControl = true)
demuxResult.keep(skipFailingReads = false, omitControlReads = false) shouldBe true
demuxResult.keep(skipFailingReads = true, omitControlReads = false) shouldBe false
demuxResult.keep(skipFailingReads = false, omitControlReads = true) shouldBe false
demuxResult.keep(skipFailingReads = true, omitControlReads = true) shouldBe false
}

}



private def throwableMessageShouldInclude(msg: String)(r: => Unit): Unit = {
val result = Try(r)
result.isFailure shouldBe true
Expand Down Expand Up @@ -549,29 +623,32 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
}
}

def testEndToEndWithFastqStandards(fastqStandards: FastqStandards, skipFailingReads: Boolean = false): Unit = {

def testEndToEndWithFastqStandards(fastqStandards: FastqStandards, skipFailingReads: Boolean = false, omitControlReads: Boolean = false): Unit = {
// Build the FASTQ
val fastqs = new ListBuffer[FastqRecord]()
val namePrefix = "Instrument:RunID:FlowCellID:Lane:Tile:X"
val filterFlag = if (skipFailingReads) "Y" else "N"
fastqs += fq(name=f"$namePrefix:1", comment=Some(f"1:$filterFlag:0:SampleNumber"), bases=sampleBarcode1 + "A"*100) // matches the first sample -> first sample
val controlFlag = if (!omitControlReads) "0" else "1"
fastqs += fq(name=f"$namePrefix:1", comment=Some(f"1:$filterFlag:$controlFlag:SampleNumber"), bases=sampleBarcode1 + "A"*100) // matches the first sample -> first sample
fastqs += fq(name=f"$namePrefix:2", comment=Some("2:N:0:SampleNumber"), bases="AAAAAAAAGATTACAGT" + "A"*100) // matches the first sample, one mismatch -> first sample
fastqs += fq(name=f"$namePrefix:3", comment=Some("3:N:0:SampleNumber"), bases="AAAAAAAAGATTACTTT" + "A"*100) // matches the first sample, three mismatches -> unmatched
fastqs += fq(name=f"$namePrefix:3", comment=Some(f"3:N:$controlFlag:SampleNumber"), bases="AAAAAAAAGATTACTTT" + "A"*100) // matches the first sample, three mismatches -> unmatched
fastqs += fq(name=f"$namePrefix:4", comment=Some("4:N:0:SampleNumber"), bases=sampleBarcode4 + "A"*100) // matches the 4th barcode perfectly and the 3rd barcode with two mismatches, delta too small -> unmatched
fastqs += fq(name=f"$namePrefix:5", comment=Some("5:N:0:SampleNumber"), bases="AAAAAAAAGANNNNNNN" + "A"*100) // matches the first sample, too many Ns -> unmatched
val barcodesPerSample = Seq(
if (skipFailingReads) Seq(sampleBarcode1) else Seq(sampleBarcode1, "AAAAAAAAGATTACAGT"), // sample 1

if (skipFailingReads) Seq(sampleBarcode1) else if (omitControlReads) Seq("AAAAAAAAGATTACAGT") else Seq(sampleBarcode1, "AAAAAAAAGATTACAGT"), // sample 1
Seq.empty, // sample 2
Seq.empty, // sample 3
Seq.empty, // sample 4
if (skipFailingReads) Seq.empty else Seq("AAAAAAAAGATTACTTT", sampleBarcode4, "AAAAAAAAGANNNNNNN")
if (skipFailingReads) Seq.empty else if (omitControlReads) Seq(sampleBarcode4, "AAAAAAAAGANNNNNNN") else Seq("AAAAAAAAGATTACTTT", sampleBarcode4, "AAAAAAAAGANNNNNNN")
)
val assignmentsPerSample = Seq(
if (skipFailingReads) Seq("1") else Seq("1", "2"), // sample 1
if (skipFailingReads) Seq("1") else if (omitControlReads) Seq("2") else Seq("1", "2"), // sample 1
Seq.empty, // sample 2
Seq.empty, // sample 3
Seq.empty, // sample 4
if (skipFailingReads) Seq.empty else Seq("3", "4", "5") // unmatched
if (skipFailingReads) Seq.empty else if (omitControlReads) Seq("4", "5") else Seq("3", "4", "5") // unmatched
)
val illuminaReadNamesFastqPath = makeTempFile("test", ".fastq")
Io.writeLines(illuminaReadNamesFastqPath, fastqs.map(_.toString))
Expand All @@ -592,7 +669,7 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
fastqIncludeSampleBarcodes = fastqStandards.includeSampleBarcodes,
illuminaFileNames = fastqStandards.illuminaFileNames,
skipFailingReads = skipFailingReads,
).execute()
omitControlReads = omitControlReads).execute()


// Check the output FASTQs
Expand Down Expand Up @@ -662,6 +739,10 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
testEndToEndWithFastqStandards(FastqStandards(includeReadNumbers=true, includeSampleBarcodes=true), skipFailingReads = true)
}

it should "demultiplex with --fastqs-include-read-numbers=true --fastqs-include-sample-barcodes=true and --omit-control-reads=false" in {
testEndToEndWithFastqStandards(FastqStandards(includeReadNumbers=true, includeSampleBarcodes=true), omitControlReads = false)
}

it should "demultiplex fragment reads with standard qualities" in {
val output = outputDir()
val metrics = makeTempFile("metrics", ".txt")
Expand Down Expand Up @@ -761,7 +842,7 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel {
val metrics = makeTempFile("metrics", ".txt")

new DemuxFastqs(inputs=Seq(fq1, fq2, fq3, fq4), output=output, metadata=sampleSheetPath,
readStructures=structures, metrics=Some(metrics), maxMismatches=2, minMismatchDelta=3).execute()
readStructures=structures, metrics=Some(metrics), maxMismatches=2, minMismatchDelta=3, omitControlReads = false).execute()

val sampleBarcodMetrics = Metric.read[SampleBarcodeMetric](metrics)
val sampleInfos = toSampleInfos(structures)
Expand Down

0 comments on commit 4199efd

Please sign in to comment.