diff --git a/src/main/scala/com/fulcrumgenomics/bam/Bams.scala b/src/main/scala/com/fulcrumgenomics/bam/Bams.scala index 6e1650dc3..81ac2dcdb 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/Bams.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/Bams.scala @@ -33,10 +33,9 @@ import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter} import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder} import htsjdk.samtools.SamPairUtil.PairOrientation import htsjdk.samtools._ -import htsjdk.samtools.reference.ReferenceSequenceFileWalker +import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFileWalker} import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil} -import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.math.{max, min} /** @@ -246,7 +245,7 @@ object Bams extends LazyLogging { case (Some(Queryname), _) => new SelfClosingIterator(iterator.bufferBetter, () => CloserUtil.close(iterator)) case (_, _) => logger.info(parts = "Sorting into queryname order.") - val progress = ProgressLogger(this.logger, "Records", "sorted") + val progress = ProgressLogger(this.logger, "records", "sorted") val sort = sorter(Queryname, header, maxInMemory, tmpDir) iterator.foreach { rec => progress.record(rec) @@ -410,9 +409,9 @@ object Bams extends LazyLogging { * values are removed. If the read is mapped all three tags will have values regenerated. * * @param rec the SamRecord to update - * @param ref a reference sequence file walker to pull the reference information from + * @param ref a reference sequence if the record is mapped */ - def regenerateNmUqMdTags(rec: SamRecord, ref: ReferenceSequenceFileWalker): Unit = { + def regenerateNmUqMdTags(rec: SamRecord, ref: => ReferenceSequence): Unit = { import SAMTag._ if (rec.unmapped) { rec(NM.name()) = null @@ -420,12 +419,13 @@ object Bams extends LazyLogging { rec(MD.name()) = null } else { - val refBases = ref.get(rec.refIndex).getBases + val refBases = ref.getBases SequenceUtil.calculateMdAndNmTags(rec.asSam, refBases, true, true) if (rec.quals != null && rec.quals.length != 0) { rec(SAMTag.UQ.name) = SequenceUtil.sumQualitiesOfMismatches(rec.asSam, refBases, 0) } } + rec } /** diff --git a/src/main/scala/com/fulcrumgenomics/bam/ClipBam.scala b/src/main/scala/com/fulcrumgenomics/bam/ClipBam.scala index f74998a5f..290ab2104 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/ClipBam.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/ClipBam.scala @@ -128,7 +128,7 @@ class ClipBam val out = SamWriter(output, header, ref=Some(ref)) sorter.foreach { rec => - Bams.regenerateNmUqMdTags(rec, walker) + Bams.regenerateNmUqMdTags(rec, walker.get(rec.refIndex)) out += rec } diff --git a/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala b/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala index da0d0c129..3f94de957 100644 --- a/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala +++ b/src/main/scala/com/fulcrumgenomics/fasta/ReferenceSequenceIterator.scala @@ -25,15 +25,16 @@ package com.fulcrumgenomics.fasta import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFile, ReferenceSequenceFileFactory} import com.fulcrumgenomics.commons.CommonsDef._ +import com.fulcrumgenomics.commons.collection.SelfClosingIterator object ReferenceSequenceIterator { /** Constructs an iterator over a reference sequence from a Path to the FASTA file. */ - def apply(path: PathToFasta, stripComments: Boolean = false) = { + def apply(path: PathToFasta, stripComments: Boolean = false): ReferenceSequenceIterator = { new ReferenceSequenceIterator(ReferenceSequenceFileFactory.getReferenceSequenceFile(path, stripComments, false)) } /** Constructs an iterator over a reference sequence from a ReferenceSequenceFile. */ - def apply(ref: ReferenceSequenceFile) = { + def apply(ref: ReferenceSequenceFile): ReferenceSequenceIterator = { new ReferenceSequenceIterator(ref) } } @@ -45,8 +46,15 @@ object ReferenceSequenceIterator { class ReferenceSequenceIterator private(private val ref: ReferenceSequenceFile) extends Iterator[ReferenceSequence] { // The next reference sequence; will be null if there is no next :( private var nextSequence: ReferenceSequence = ref.nextSequence() + private var isOpen: Boolean = true - override def hasNext: Boolean = this.nextSequence != null + override def hasNext: Boolean = if (this.nextSequence != null) true else { + if (isOpen) { + isOpen = false + ref.close() + } + false + } override def next(): ReferenceSequence = { assert(hasNext, "next() called on empty iterator") diff --git a/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala b/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala index 1c79907d3..4c594d9de 100644 --- a/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala +++ b/src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala @@ -24,20 +24,24 @@ package com.fulcrumgenomics.umi -import java.lang.Math.{max, min} - import com.fulcrumgenomics.FgBioDef._ import com.fulcrumgenomics.bam.Bams import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter} import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} +import com.fulcrumgenomics.commons.io.Writer import com.fulcrumgenomics.commons.util.LazyLogging +import com.fulcrumgenomics.fasta.ReferenceSequenceIterator import com.fulcrumgenomics.sopt.{arg, clp} import com.fulcrumgenomics.util.NumericTypes.PhredScore -import com.fulcrumgenomics.util.{Io, ProgressLogger} -import htsjdk.samtools.SAMFileHeader.SortOrder -import htsjdk.samtools.reference.ReferenceSequenceFileWalker +import com.fulcrumgenomics.util.Io +import htsjdk.samtools.SAMFileHeader +import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder} +import htsjdk.samtools.reference.ReferenceSequence import htsjdk.samtools.util.SequenceUtil +import java.io.Closeable +import java.lang.Math.{max, min} + /** Filter values for filtering consensus reads */ private[umi] case class ConsensusReadFilter(minReads: Int, maxReadErrorRate: Double, maxBaseErrorRate: Double) @@ -87,8 +91,10 @@ private[umi] case class ConsensusReadFilter(minReads: Int, maxReadErrorRate: Dou |values two and three differ, the _more stringent value comes earlier_. | |In order to correctly filter reads in or out by template, if the input BAM is not `queryname` sorted or - |grouped it will be sorted into queryname order. The resulting records are `coordinate` sorted to efficiently - |correct the `NM`, `UQ` and `MD` tags, and the output BAM is always written in coordinate order. + |query grouped it will be sorted into queryname order prior to filtering. + | + |The output sort order may be specified with `--sort-order`. If not given, then the output will be in the same + |order as input if the input is queryname sorted or query grouped, otherwise queryname order. | |The `--reverse-tags-per-base` option controls whether per-base tags should be reversed before being used on reads |marked as being mapped to the negative strand. This is necessary if the reads have been mapped and the @@ -114,7 +120,9 @@ class FilterConsensusReads @arg(flag='q', doc="The minimum mean base quality across the consensus read.") val minMeanBaseQuality: Option[PhredScore] = None, @arg(flag='s', doc="Mask (make `N`) consensus bases where the AB and BA consensus reads disagree (for duplex-sequencing only).") - val requireSingleStrandAgreement: Boolean = false + val requireSingleStrandAgreement: Boolean = false, + @arg(flag='S', doc="The sort order of the output. If not given, output will be in the same order as input if the input is query name sorted or query grouped, otherwise queryname order.") + val sortOrder: Option[SamOrder] = None ) extends FgBioTool with LazyLogging { // Baseline input validation Io.assertReadable(input) @@ -169,12 +177,12 @@ class FilterConsensusReads private val EmptyFilterResult = FilterResult(keepRead=true, maskedBases=0) override def execute(): Unit = { - val in = SamSource(input) - val header = in.header.clone() - header.setSortOrder(SortOrder.coordinate) - val sorter = Bams.sorter(SamOrder.Coordinate, header, maxRecordsInRam=MaxRecordsInMemoryWhenSorting) - val out = SamWriter(output, header) - val progress1 = ProgressLogger(logger, verb="Filtered and masked") + logger.info("Reading the reference fasta into memory") + val refMap = ReferenceSequenceIterator(ref, stripComments=true).map { ref => ref.getContigIndex -> ref}.toMap + logger.info(f"Read ${refMap.size}%,d contigs.") + + val in = SamSource(input) + val out = buildOutputWriter(in.header, refMap) // Go through the reads by template and do the filtering val templateIterator = Bams.templateIterator(in, maxInMemory=MaxRecordsInMemoryWhenSorting) @@ -201,34 +209,82 @@ class FilterConsensusReads keptReads += primaryReadCount totalBases += r1.length + template.r2.map(_.length).getOrElse(0) maskedBases += r1Result.maskedBases + r2Result.maskedBases - sorter += r1 - progress1.record(r1) - template.r2.foreach { r => sorter += r; progress1.record(r) } + out += r1 + template.r2.foreach { r => out += r } template.allSupplementaryAndSecondary.foreach { r => val result = filterRecord(r) if (result.keepRead) { - sorter += r - progress1.record(r) + out += r } } } } - // Then iterate the reads in coordinate order and re-calculate key tags - logger.info("Filtering complete; fixing tags and writing coordinate sorted reads.") - val progress2 = new ProgressLogger(logger, verb="Wrote") - val walker = new ReferenceSequenceFileWalker(ref.toFile) - sorter.foreach { rec => - Bams.regenerateNmUqMdTags(rec, walker) - out += rec - progress2.record(rec) - } - + logger.info("Finalizing the output") in.safelyClose() out.close() - logger.info(f"Output ${keptReads}%,d of ${totalReads}%,d primary consensus reads.") - logger.info(f"Masked ${maskedBases}%,d of ${totalBases}%,d bases in retained primary consensus reads.") + logger.info(f"Output $keptReads%,d of $totalReads%,d primary consensus reads.") + logger.info(f"Masked $maskedBases%,d of $totalBases%,d bases in retained primary consensus reads.") + } + + /** Builds the writer to which filtered records should be written. + * + * If the input order is [[SamOrder.Queryname]] or query grouped, then the filtered records will also be in the same + * order. So if the output order is specified AND does not match the the input order, sorting will occur. + * + * If the input order is not [[SamOrder.Queryname]] or query grouped, then the input records will be resorted into + * [[SamOrder.Queryname]]. So if the output order is specified AND is not [[SamOrder.Queryname]], sorting will occur. + * + * Otherwise, we can skip sorting! + * + * */ + private def buildOutputWriter(inHeader: SAMFileHeader, refMap: Map[Int, ReferenceSequence]): Writer[SamRecord] with Closeable = { + val outHeader = inHeader.clone() + + val inSortOrder = inHeader.getSortOrder + val inGroupOrder = inHeader.getGroupOrder + val inSubSort = Option(inHeader.getAttribute("SS")) + + // Get the sort order, if any, to force the writer to resort. + val writerSortOrder = { + val canSkipSortingInput = inSortOrder == SortOrder.queryname || inGroupOrder == GroupOrder.query + (this.sortOrder, canSkipSortingInput) match { + case (None, false) => + // already sorted to Queryname after filtering, so the output will be Queryname, and so no need to re-sort the output + SamOrder.Queryname.applyTo(outHeader) + None + case (Some(order), false) => + // already sorted to Queryname after filtering, so don't sort the output if the given output sort order matches Queryname + order.applyTo(outHeader) + if (order != SamOrder.Queryname) Some(order) else None + case (None, true) => + // input did not need to be re-sorted, so do not re-sort the output + None + case (Some(order), true) => + // If sorting prior to filtering was not required, then the input was queryname or query grouped. So sort the + // output only if the specified output `sortOrder` does not match the input sort order. + if (order.sortOrder == inSortOrder && order.groupOrder == inGroupOrder && order.subSort == inSubSort) { + None // input matches the output order, no need to force sorting, keep the header + } + else { // mismatch in the input order and output order, force sorting the output + order.applyTo(outHeader) + Some(order) + } + } + } + + val writer = SamWriter(output, outHeader, sort=writerSortOrder, maxRecordsInRam=MaxRecordsInMemoryWhenSorting) + writerSortOrder.foreach(o => logger.info(f"Output will be sorted into $o order")) + + // Create the final writer based on if the full reference has been loaded, or not + new Writer[SamRecord] with Closeable { + override def write(rec: SamRecord): Unit = { + Bams.regenerateNmUqMdTags(rec, refMap(rec.refIndex)) + writer += rec + } + def close(): Unit = writer.close() + } } /** diff --git a/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala b/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala index d7483d196..6ad22a684 100644 --- a/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala +++ b/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala @@ -225,7 +225,7 @@ class BamsTest extends UnitSpec { rec("NM") = 7 rec("MD") = "6A7C8T9G" rec("UQ") = 237 - Bams.regenerateNmUqMdTags(rec, DummyRefWalker) + Bams.regenerateNmUqMdTags(rec, DummyRefWalker.get(rec.refIndex)) rec.get[Int]("NM") shouldBe None rec.get[String]("MD") shouldBe None rec.get[Int]("UQ") shouldBe None @@ -238,7 +238,7 @@ class BamsTest extends UnitSpec { rec("MD") = "6A7C8T9G" rec("UQ") = 237 rec.bases = "AAACAAAATA" - Bams.regenerateNmUqMdTags(rec, DummyRefWalker) + Bams.regenerateNmUqMdTags(rec, DummyRefWalker.get(rec.refIndex)) rec[Int]("NM") shouldBe 2 rec[String]("MD") shouldBe "3A4A1" rec[Int]("UQ") shouldBe 40 diff --git a/src/test/scala/com/fulcrumgenomics/umi/FilterConsensusReadsTest.scala b/src/test/scala/com/fulcrumgenomics/umi/FilterConsensusReadsTest.scala index 4774ebea5..2e08c4595 100644 --- a/src/test/scala/com/fulcrumgenomics/umi/FilterConsensusReadsTest.scala +++ b/src/test/scala/com/fulcrumgenomics/umi/FilterConsensusReadsTest.scala @@ -290,6 +290,67 @@ class FilterConsensusReadsTest extends UnitSpec { } } + private case class ReadNames(in: Seq[String], out: Seq[String]) + + private def sortOrderTest(name1: String, start1R1: Int, start1R2: Int, name2: String, start2R1: Int, start2R2: Int, + inOrder: SamOrder, outOrder: Option[SamOrder] = None): ReadNames = { + val builder = new SamBuilder(readLength=10, baseQuality=45, sort=Some(inOrder)) + builder.addPair(name=name1, start1=start1R1, start2=start1R2).foreach(r => tag(r, minDepth=4, depth=4, readErr=0f, depths=arr(4, 10), errors=arr(0,10))) + builder.addPair(name=name2, start1=start2R1, start2=start2R2).foreach(r => tag(r, minDepth=5, depth=5, readErr=0f, depths=arr(5, 10), errors=arr(0,10))) + val in = builder.toTempFile() + val out = makeTempFile("filtered.", ".bam") + new FilterConsensusReads(input=in, output=out, ref=ref, reversePerBaseTags=false, + minBaseQuality=45.toByte, minReads=Seq(3), maxReadErrorRate=Seq(0.025), maxBaseErrorRate=Seq(0.1), maxNoCallFraction=0.1, + sortOrder=outOrder + ).execute() + + val recs = SamSource(out).toSeq + recs.size shouldBe 4 + recs.exists(_.basesString.contains("N")) shouldBe false + ReadNames(in=readBamRecs(in).map(_.name), out=recs.map(_.name)) + } + + it should "should output queryname sorted if the input is queryname sorted" in { + val result = sortOrderTest( + name1="q1", start1R1=101, start1R2=201, + name2="q2", start2R1=100, start2R2=200, + inOrder=SamOrder.Queryname + ) + result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name! + result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name! + } + + it should "should output query grouped sorted if the input is query grouped sorted" in { + val result = sortOrderTest( + name1="q2", start1R1=100, start1R2=200, + name2="q1", start2R1=101, start2R2=201, + inOrder=SamOrder.TemplateCoordinate + ) + result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name + result.out should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name + } + + it should "should output queryname sorted if the input is neither queryname nor query grouped sorted" in { + val result = sortOrderTest( + name1="q2", start1R1=100, start1R2=200, + name2="q1", start2R1=101, start2R2=201, + inOrder=SamOrder.Unsorted + ) + result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name + result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name + } + + it should "should output coordinate sorted if the output order is coordinate" in { + val result = sortOrderTest( + name1="q1", start1R1=100, start1R2=200, + name2="q2", start2R1=101, start2R2=201, + inOrder=SamOrder.Queryname, + outOrder=Some(SamOrder.Coordinate) + ) + result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name + result.out should contain theSameElementsInOrderAs Seq("q1", "q2", "q1", "q2") // coordinate + } + ////////////////////////////////////////////////////////////////////////////// // Below this line are tests for filtering of duplex consensus reads. //////////////////////////////////////////////////////////////////////////////