From efb058e9a00b46e320ffea6798889f3f3ffd7dd6 Mon Sep 17 00:00:00 2001 From: kstromhaug Date: Mon, 27 Sep 2021 10:06:55 -0600 Subject: [PATCH] Adding option to filter on the internal control flag, and accompanying tests (#714) * Added --omit-control-reads to omit any reads marked as control in the FASTQ read header comment --- .../fulcrumgenomics/fastq/DemuxFastqs.scala | 52 ++++--- .../fastq/DemuxFastqsTest.scala | 137 ++++++++++++++---- 2 files changed, 143 insertions(+), 46 deletions(-) diff --git a/src/main/scala/com/fulcrumgenomics/fastq/DemuxFastqs.scala b/src/main/scala/com/fulcrumgenomics/fastq/DemuxFastqs.scala index 4766fd44b..a258892e6 100644 --- a/src/main/scala/com/fulcrumgenomics/fastq/DemuxFastqs.scala +++ b/src/main/scala/com/fulcrumgenomics/fastq/DemuxFastqs.scala @@ -143,7 +143,8 @@ object DemuxFastqs { demultiplexer: FastqDemultiplexer, threads: Int, batchSize: Int = DemuxBatchRecordsSize, - omitFailingReads: Boolean): Iterator[DemuxResult] = { + omitFailingReads: Boolean, + omitControlReads: Boolean): Iterator[DemuxResult] = { require(demultiplexer.expectedNumberOfReads == sources.length, s"The demultiplexer expects ${demultiplexer.expectedNumberOfReads} reads but ${sources.length} FASTQ sources given.") @@ -165,7 +166,8 @@ object DemuxFastqs { else { zippedIterator.map { readRecords => demultiplexer.demultiplex(readRecords: _*) } } - resultIterator.filter(r => r.keep(omitFailingReads)) + + resultIterator.filter(r => r.keep(omitFailingReads, omitControlReads)) } } @@ -294,6 +296,8 @@ object DemuxFastqs { |`--omit-fastq-read-numbers=true --include-sample-barcodes-in-fastq=true --illumina-file-names=true` | |By default all input reads are output. If your input FASTQs contain reads that do not pass filter (as defined by the Y/N filter flag in the FASTQ comment) these can be filtered out during demultiplexing using the `--omit-failing-reads` option. + | + |To output only reads that are not control reads, as encoded in the `` field in the header comment, use the `--omit-control-reads` flag """, group=ClpGroups.Fastq ) @@ -345,7 +349,9 @@ class DemuxFastqs @arg(doc="Name the output files according to the Illumina file name standards.", mutex=Array("illuminaStandards")) val illuminaFileNames: Boolean = false, @arg(doc="Keep only passing filter reads if true, otherwise keep all reads. Passing filter reads are determined from the comment in the FASTQ header.") - val omitFailingReads: Boolean = false + val omitFailingReads: Boolean = false, + @arg(doc="Do not keep reads identified as control if true, otherwise keep all reads. Control reads are determined from the comment in the FASTQ header.") + val omitControlReads: Boolean = false ) extends FgBioTool with LazyLogging { // Support the deprecated --illumina-standards option @@ -434,7 +440,9 @@ class DemuxFastqs maxNoCalls = maxNoCalls, includeOriginal = this.includeAllBasesInFastqs, fastqStandards = this.fastqStandards, - omitFailingReads = this.omitFailingReads) + omitFailingReads = this.omitFailingReads, + omitControlReads = this.omitControlReads + ) val progress = ProgressLogger(this.logger, unit=1e6.toInt) @@ -442,10 +450,11 @@ class DemuxFastqs // DemuxRecord in parallel val sources = inputs.map(FastqSource(_)) val iterator = demultiplexingIterator( - sources = sources, - demultiplexer = demultiplexer, - threads = threads, - omitFailingReads = this.omitFailingReads + sources = sources, + demultiplexer = demultiplexer, + threads = threads, + omitFailingReads = this.omitFailingReads, + omitControlReads = this.omitControlReads ) // Write the records out in its own thread @@ -627,22 +636,24 @@ private[fastq] object FastqDemultiplexer { * @param numMismatches the # of mismatches if it matched a sample with a sample barcode. This will be [[Int.MaxValue]] * for the unmatched sample. * @param records the records, one for each read that has template bases. + * @param passQc flag noting if the read passes QC. Default true to keep all reads unless otherwise specified. + * @param isControl flag noting if the read is an internal control. Default false unless filtering to remove internal control reads. */ - case class DemuxResult(sampleInfo: SampleInfo, numMismatches: Int, records: Seq[DemuxRecord], passQc: Boolean = true) { + case class DemuxResult(sampleInfo: SampleInfo, numMismatches: Int, records: Seq[DemuxRecord], passQc: Boolean = true, isControl: Boolean = false) { /** Returns true if this [[DemuxResult]] should be kept for output, false otherwise. - * - * Returns true if `omitFailingReads` is fase or if `passQc` is true + * Returns true if `omitFailingReads` is false or if `passQc` is true * * @param omitFailingReads true to keep only passing reads, false to keep all reads */ - def keep(omitFailingReads: Boolean = false): Boolean = { - if (!omitFailingReads) true else passQc + + def keep(omitFailingReads: Boolean, omitControlReads: Boolean): Boolean = { + val keepByQc = if (!omitFailingReads) true else passQc + val keepByControl = if (!omitControlReads) true else !isControl + keepByQc && keepByControl } } - - /** Counts the nucleotide mismatches between two strings of the same length. Ignores no calls in expectedBases. */ private[fastq] def countMismatches(observedBases: Array[Byte], expectedBases: Array[Byte]): Int = { require(observedBases.length == expectedBases.length, s"observedBases: ${observedBases.length} expectedBases: ${expectedBases.length}") @@ -677,6 +688,8 @@ private[fastq] object FastqDemultiplexer { * @param includeOriginal true if to provide set the values for `originaBases` and `originalQuals` in [[DemuxResult]], * namely the bases and qualities FOR ALL bases, including molecular barcode, sample barcode, * and skipped bases. + * @param keepOnlyPassing true if to remove reads that don't pass QC, marked as 'N' in the header comment + * @param omitControlReads false if to keep reads that are marked as internal control reads in the header comment. */ private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo], readStructures: Seq[ReadStructure], @@ -686,7 +699,8 @@ private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo], val minMismatchDelta: Int = 1, val maxNoCalls: Int = 2, val includeOriginal: Boolean = false, - val omitFailingReads: Boolean = false) { + val omitFailingReads: Boolean = false, + val omitControlReads: Boolean = false) { import FastqDemultiplexer._ require(readStructures.nonEmpty, "No read structures were given") @@ -793,8 +807,10 @@ private class FastqDemultiplexer(val sampleInfos: Seq[SampleInfo], ) } + val passQc = demuxRecords.forall(d => d.readInfo.forall(_.passQc)) - DemuxResult(sampleInfo=sampleInfo, numMismatches=numMismatches, records=demuxRecords, passQc=passQc) + val isControl = demuxRecords.forall(d => d.readInfo.forall(_.internalControl)) + DemuxResult(sampleInfo=sampleInfo, numMismatches=numMismatches, records=demuxRecords, passQc=passQc, isControl=isControl) } } @@ -857,7 +873,7 @@ object ReadInfo { ReadInfo( readNumber = readNumber.toInt, passQc = keepBoolean, - internalControl = internalControl.toInt == 1, + internalControl = internalControl.toInt != 0, sampleInfo = sampleInfo, rest = comments.drop(1) ) diff --git a/src/test/scala/com/fulcrumgenomics/fastq/DemuxFastqsTest.scala b/src/test/scala/com/fulcrumgenomics/fastq/DemuxFastqsTest.scala index 20c71e61a..a1a9e9022 100644 --- a/src/test/scala/com/fulcrumgenomics/fastq/DemuxFastqsTest.scala +++ b/src/test/scala/com/fulcrumgenomics/fastq/DemuxFastqsTest.scala @@ -105,9 +105,9 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { } /** Helper method to create a [[FastqDemultiplexer]] */ - private def dx(structures: Seq[ReadStructure], mm: Int = 2, md: Int = 1, mn: Int = 1, omitFailingReads: Boolean = false, fastqStandards: FastqStandards = FastqStandards()): FastqDemultiplexer = { + private def dx(structures: Seq[ReadStructure], mm: Int = 2, md: Int = 1, mn: Int = 1, omitFailingReads: Boolean = false, omitControlReads: Boolean = false, fastqStandards: FastqStandards = FastqStandards()): FastqDemultiplexer = { new FastqDemultiplexer(sampleInfos=toSampleInfos(structures), readStructures=structures, - maxMismatches=mm, minMismatchDelta=md, maxNoCalls=mn, fastqStandards=fastqStandards, omitFailingReads=omitFailingReads) + maxMismatches=mm, minMismatchDelta=md, maxNoCalls=mn, fastqStandards=fastqStandards, omitFailingReads=omitFailingReads, omitControlReads=omitControlReads) } private def fq(name: String, bases: String, quals: Option[String]=None, comment: Option[String] = None, readNumber: Option[Int]=None): FastqRecord = @@ -304,7 +304,7 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { verifyFragUnmatchedSample(demuxer.demultiplex(fastqRecord)) } - it should "set passQc to false when ---failing-reads=true and filter=N" in { + it should "set passQc to false when --omit-failing-reads=true and filter=N" in { val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitFailingReads = true, fastqStandards = FastqStandards(includeSampleBarcodes=true)) val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:N:0:SampleNumber therest")) @@ -312,7 +312,8 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { demuxRecord.passQc shouldBe false } - it should "set passQc to false when ---failing-reads=false and filter=N" in { + + it should "set passQc to false when --omit-failing-reads=false and filter=N" in { val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitFailingReads = false, fastqStandards = FastqStandards(includeSampleBarcodes=true)) val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:N:0:SampleNumber therest")) @@ -320,7 +321,7 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { demuxRecord.passQc shouldBe false } - it should "set passQc to true when ---failing-reads=true and filter=Y" in { + it should "set passQc to true when --omit-failing-reads=true and filter=Y" in { val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitFailingReads = true, fastqStandards = FastqStandards(includeSampleBarcodes=true)) val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:0:SampleNumber therest")) @@ -328,7 +329,7 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { demuxRecord.passQc shouldBe true } - it should "set passQc to true when ---failing-reads=false and filter=Y" in { + it should "set passQc to true when --omit-failing-reads=false and filter=Y" in { val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitFailingReads = false, fastqStandards = FastqStandards(includeSampleBarcodes=true)) val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:0:SampleNumber therest")) @@ -336,6 +337,38 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { demuxRecord.passQc shouldBe true } + it should "set isControl to true when --omit-control-reads=true and the control field is nonzero" in { + val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=true, fastqStandards = FastqStandards(includeSampleBarcodes=true)) + val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:1:SampleNumber therest")) + + val demuxRecord = demuxer.demultiplex(fastqRecord) + demuxRecord.isControl shouldBe true + } + + it should "set isControl to false when --omit-control-reads=true and the control field is zero" in { + val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=true, fastqStandards = FastqStandards(includeSampleBarcodes=true)) + val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:0:SampleNumber therest")) + + val demuxRecord = demuxer.demultiplex(fastqRecord) + demuxRecord.isControl shouldBe false + } + + it should "set isControl to true when --omit-control-reads=false and the control field is nonzero" in { + val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=false, fastqStandards = FastqStandards(includeSampleBarcodes=true)) + val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:1:SampleNumber therest")) + + val demuxRecord = demuxer.demultiplex(fastqRecord) + demuxRecord.isControl shouldBe true + } + + it should "set isControl to false when --omit-control-reads=false and the control field is zero" in { + val demuxer = dx(structures=Seq(ReadStructure("17B5M100T")), omitControlReads=false, fastqStandards = FastqStandards(includeSampleBarcodes=true)) + val fastqRecord = fq(name="Instrument:RunID:FlowCellID:Lane:Tile:X:Y", bases=sampleBarcode1 + "A"*100, comment=Some("1:Y:0:SampleNumber therest")) + + val demuxRecord = demuxer.demultiplex(fastqRecord) + demuxRecord.isControl shouldBe false + } + "FastqDemultiplexer.countMismatches" should "find no mismatches" in { FastqDemultiplexer.countMismatches("GATTACA".getBytes, "GATTACA".getBytes) shouldBe 0 @@ -353,6 +386,47 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { FastqDemultiplexer.countMismatches("GATTACA".getBytes, "CTAATGT".getBytes) shouldBe 7 } + "DemuxResult.keep" should "correctly determine if a read should be kept" in { + val structures = Seq(ReadStructure("17B100T")) + val sampleInfos = toSampleInfos(structures) + val demuxRecord = DemuxRecord(name = "name", bases = "", quals = "", molecularBarcode = Seq("MB"), sampleBarcode = Seq("SB"), readNumber = 1, pairedEnd = false, comment = None) + + { // passes QC and is not an internal control + val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = true, isControl = false) + demuxResult.keep(omitFailingReads = false, omitControlReads = false) shouldBe true + demuxResult.keep(omitFailingReads = true, omitControlReads = false) shouldBe true + demuxResult.keep(omitFailingReads = false, omitControlReads = true) shouldBe true + demuxResult.keep(omitFailingReads = true, omitControlReads = true) shouldBe true + } + + { // does not pass QC and is not and internal control + val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = false, isControl = false) + demuxResult.keep(omitFailingReads = false, omitControlReads = false) shouldBe true + demuxResult.keep(omitFailingReads = true, omitControlReads = false) shouldBe false + demuxResult.keep(omitFailingReads = false, omitControlReads = true) shouldBe true + demuxResult.keep(omitFailingReads = true, omitControlReads = true) shouldBe false + } + + { // passes QC but is an internal control + val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = true, isControl = true) + demuxResult.keep(omitFailingReads = false, omitControlReads = false) shouldBe true + demuxResult.keep(omitFailingReads = true, omitControlReads = false) shouldBe true + demuxResult.keep(omitFailingReads = false, omitControlReads = true) shouldBe false + demuxResult.keep(omitFailingReads = true, omitControlReads = true) shouldBe false + } + + { // does not pass QC and is an internal control + val demuxResult = DemuxResult(sampleInfo = sampleInfos(0), numMismatches = 0, records = Seq(demuxRecord), passQc = false, isControl = true) + demuxResult.keep(omitFailingReads = false, omitControlReads = false) shouldBe true + demuxResult.keep(omitFailingReads = true, omitControlReads = false) shouldBe false + demuxResult.keep(omitFailingReads = false, omitControlReads = true) shouldBe false + demuxResult.keep(omitFailingReads = true, omitControlReads = true) shouldBe false + } + + } + + + private def throwableMessageShouldInclude(msg: String)(r: => Unit): Unit = { val result = Try(r) result.isFailure shouldBe true @@ -549,30 +623,30 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { } } - - def testEndToEndWithFastqStandards(fastqStandards: FastqStandards, omitFailingReads: Boolean = false): Unit = { + def testEndToEndWithFastqStandards(fastqStandards: FastqStandards, omitFailingReads: Boolean = false, omitControlReads: Boolean = false): Unit = { // Build the FASTQ val fastqs = new ListBuffer[FastqRecord]() val namePrefix = "Instrument:RunID:FlowCellID:Lane:Tile:X" val filterFlag = if (omitFailingReads) "Y" else "N" - fastqs += fq(name=f"$namePrefix:1", comment=Some(f"1:$filterFlag:0:SampleNumber"), bases=sampleBarcode1 + "A"*100) // matches the first sample -> first sample + val controlFlag = if (!omitControlReads) "0" else "1" + fastqs += fq(name=f"$namePrefix:1", comment=Some(f"1:$filterFlag:$controlFlag:SampleNumber"), bases=sampleBarcode1 + "A"*100) // matches the first sample -> first sample fastqs += fq(name=f"$namePrefix:2", comment=Some("2:N:0:SampleNumber"), bases="AAAAAAAAGATTACAGT" + "A"*100) // matches the first sample, one mismatch -> first sample - fastqs += fq(name=f"$namePrefix:3", comment=Some("3:N:0:SampleNumber"), bases="AAAAAAAAGATTACTTT" + "A"*100) // matches the first sample, three mismatches -> unmatched + fastqs += fq(name=f"$namePrefix:3", comment=Some(f"3:N:$controlFlag:SampleNumber"), bases="AAAAAAAAGATTACTTT" + "A"*100) // matches the first sample, three mismatches -> unmatched fastqs += fq(name=f"$namePrefix:4", comment=Some("4:N:0:SampleNumber"), bases=sampleBarcode4 + "A"*100) // matches the 4th barcode perfectly and the 3rd barcode with two mismatches, delta too small -> unmatched fastqs += fq(name=f"$namePrefix:5", comment=Some("5:N:0:SampleNumber"), bases="AAAAAAAAGANNNNNNN" + "A"*100) // matches the first sample, too many Ns -> unmatched val barcodesPerSample = Seq( - if (omitFailingReads) Seq(sampleBarcode1) else Seq(sampleBarcode1, "AAAAAAAAGATTACAGT"), // sample 1 + if (omitFailingReads) Seq(sampleBarcode1) else if (omitControlReads) Seq("AAAAAAAAGATTACAGT") else Seq(sampleBarcode1, "AAAAAAAAGATTACAGT"), // sample 1 Seq.empty, // sample 2 Seq.empty, // sample 3 Seq.empty, // sample 4 - if (omitFailingReads) Seq.empty else Seq("AAAAAAAAGATTACTTT", sampleBarcode4, "AAAAAAAAGANNNNNNN") + if (omitFailingReads) Seq.empty else if (omitControlReads) Seq(sampleBarcode4, "AAAAAAAAGANNNNNNN") else Seq("AAAAAAAAGATTACTTT", sampleBarcode4, "AAAAAAAAGANNNNNNN") ) val assignmentsPerSample = Seq( - if (omitFailingReads) Seq("1") else Seq("1", "2"), // sample 1 + if (omitFailingReads) Seq("1") else if (omitControlReads) Seq("2") else Seq("1", "2"), // sample 1 Seq.empty, // sample 2 Seq.empty, // sample 3 Seq.empty, // sample 4 - if (omitFailingReads) Seq.empty else Seq("3", "4", "5") // unmatched + if (omitFailingReads) Seq.empty else if (omitControlReads) Seq("4", "5") else Seq("3", "4", "5") // unmatched ) val illuminaReadNamesFastqPath = makeTempFile("test", ".fastq") Io.writeLines(illuminaReadNamesFastqPath, fastqs.map(_.toString)) @@ -581,20 +655,19 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { val output = outputDir() val structures = Seq(ReadStructure("17B100T"), ReadStructure("117T")) new DemuxFastqs( - inputs = Seq(illuminaReadNamesFastqPath, illuminaReadNamesFastqPath), - output = output, - metadata = sampleSheetPath, - readStructures = structures, - metrics = None, - maxMismatches = 2, - minMismatchDelta = 3, - outputType = Some(OutputType.Fastq), - omitFastqReadNumbers = !fastqStandards.includeReadNumbers, + inputs = Seq(illuminaReadNamesFastqPath, illuminaReadNamesFastqPath), + output = output, + metadata = sampleSheetPath, + readStructures = structures, + metrics = None, + maxMismatches = 2, + minMismatchDelta = 3, + outputType = Some(OutputType.Fastq), + omitFastqReadNumbers = !fastqStandards.includeReadNumbers, includeSampleBarcodesInFastq = fastqStandards.includeSampleBarcodes, - illuminaFileNames = fastqStandards.illuminaFileNames, - omitFailingReads = omitFailingReads, - ).execute() - + illuminaFileNames = fastqStandards.illuminaFileNames, + omitFailingReads = omitFailingReads, + omitControlReads = omitControlReads).execute() // Check the output FASTQs toSampleInfos(structures).zipWithIndex.foreach { case (sampleInfo, index) => @@ -659,6 +732,14 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { testEndToEndWithFastqStandards(FastqStandards(includeReadNumbers=true, includeSampleBarcodes=true)) } + it should "demultiplex with --fastqs-include-read-numbers=true --fastqs-include-sample-barcodes=true and --omit-failing-reads=true" in { + testEndToEndWithFastqStandards(FastqStandards(includeReadNumbers=true, includeSampleBarcodes=true), omitFailingReads = true) + } + + it should "demultiplex with --fastqs-include-read-numbers=true --fastqs-include-sample-barcodes=true and --omit-control-reads=false" in { + testEndToEndWithFastqStandards(FastqStandards(includeReadNumbers=true, includeSampleBarcodes=true), omitControlReads = false) + } + it should "demultiplex fragment reads with standard qualities" in { val output = outputDir() val metrics = makeTempFile("metrics", ".txt") @@ -758,7 +839,7 @@ class DemuxFastqsTest extends UnitSpec with OptionValues with ErrorLogLevel { val metrics = makeTempFile("metrics", ".txt") new DemuxFastqs(inputs=Seq(fq1, fq2, fq3, fq4), output=output, metadata=sampleSheetPath, - readStructures=structures, metrics=Some(metrics), maxMismatches=2, minMismatchDelta=3).execute() + readStructures=structures, metrics=Some(metrics), maxMismatches=2, minMismatchDelta=3, omitControlReads = false).execute() val sampleBarcodMetrics = Metric.read[SampleBarcodeMetric](metrics) val sampleInfos = toSampleInfos(structures)