Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify an output sort order in FilterConsensusReads #782

Merged
merged 7 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/main/scala/com/fulcrumgenomics/bam/Bams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter}
import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder}
import htsjdk.samtools.SamPairUtil.PairOrientation
import htsjdk.samtools._
import htsjdk.samtools.reference.ReferenceSequenceFileWalker
import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFileWalker}
import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil}

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.math.{max, min}

/**
Expand Down Expand Up @@ -246,7 +245,7 @@ object Bams extends LazyLogging {
case (Some(Queryname), _) => new SelfClosingIterator(iterator.bufferBetter, () => CloserUtil.close(iterator))
case (_, _) =>
logger.info(parts = "Sorting into queryname order.")
val progress = ProgressLogger(this.logger, "Records", "sorted")
val progress = ProgressLogger(this.logger, "records", "sorted")
val sort = sorter(Queryname, header, maxInMemory, tmpDir)
iterator.foreach { rec =>
progress.record(rec)
Expand Down Expand Up @@ -410,22 +409,23 @@ object Bams extends LazyLogging {
* values are removed. If the read is mapped all three tags will have values regenerated.
*
* @param rec the SamRecord to update
* @param ref a reference sequence file walker to pull the reference information from
* @param ref a reference sequence if the record is mapped
*/
def regenerateNmUqMdTags(rec: SamRecord, ref: ReferenceSequenceFileWalker): Unit = {
def regenerateNmUqMdTags(rec: SamRecord, ref: => ReferenceSequence): Unit = {
import SAMTag._
if (rec.unmapped) {
rec(NM.name()) = null
rec(UQ.name()) = null
rec(MD.name()) = null
}
else {
val refBases = ref.get(rec.refIndex).getBases
val refBases = ref.getBases
SequenceUtil.calculateMdAndNmTags(rec.asSam, refBases, true, true)
if (rec.quals != null && rec.quals.length != 0) {
rec(SAMTag.UQ.name) = SequenceUtil.sumQualitiesOfMismatches(rec.asSam, refBases, 0)
}
}
rec
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/fulcrumgenomics/bam/ClipBam.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ClipBam
val out = SamWriter(output, header, ref=Some(ref))

sorter.foreach { rec =>
Bams.regenerateNmUqMdTags(rec, walker)
Bams.regenerateNmUqMdTags(rec, walker.get(rec.refIndex))
out += rec
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ package com.fulcrumgenomics.fasta

import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFile, ReferenceSequenceFileFactory}
import com.fulcrumgenomics.commons.CommonsDef._
import com.fulcrumgenomics.commons.collection.SelfClosingIterator

object ReferenceSequenceIterator {
/** Constructs an iterator over a reference sequence from a Path to the FASTA file. */
def apply(path: PathToFasta, stripComments: Boolean = false) = {
def apply(path: PathToFasta, stripComments: Boolean = false): ReferenceSequenceIterator = {
new ReferenceSequenceIterator(ReferenceSequenceFileFactory.getReferenceSequenceFile(path, stripComments, false))
}

/** Constructs an iterator over a reference sequence from a ReferenceSequenceFile. */
def apply(ref: ReferenceSequenceFile) = {
def apply(ref: ReferenceSequenceFile): ReferenceSequenceIterator = {
new ReferenceSequenceIterator(ref)
}
}
Expand All @@ -45,8 +46,15 @@ object ReferenceSequenceIterator {
class ReferenceSequenceIterator private(private val ref: ReferenceSequenceFile) extends Iterator[ReferenceSequence] {
// The next reference sequence; will be null if there is no next :(
private var nextSequence: ReferenceSequence = ref.nextSequence()
private var isOpen: Boolean = true

override def hasNext: Boolean = this.nextSequence != null
override def hasNext: Boolean = if (this.nextSequence != null) true else {
if (isOpen) {
isOpen = false
ref.close()
}
false
}

override def next(): ReferenceSequence = {
assert(hasNext, "next() called on empty iterator")
Expand Down
118 changes: 87 additions & 31 deletions src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@

package com.fulcrumgenomics.umi

import java.lang.Math.{max, min}

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.Bams
import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.io.Writer
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.fasta.ReferenceSequenceIterator
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.util.NumericTypes.PhredScore
import com.fulcrumgenomics.util.{Io, ProgressLogger}
import htsjdk.samtools.SAMFileHeader.SortOrder
import htsjdk.samtools.reference.ReferenceSequenceFileWalker
import com.fulcrumgenomics.util.Io
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder}
import htsjdk.samtools.reference.ReferenceSequence
import htsjdk.samtools.util.SequenceUtil

import java.io.Closeable
import java.lang.Math.{max, min}

/** Filter values for filtering consensus reads */
private[umi] case class ConsensusReadFilter(minReads: Int, maxReadErrorRate: Double, maxBaseErrorRate: Double)

Expand Down Expand Up @@ -87,8 +91,10 @@ private[umi] case class ConsensusReadFilter(minReads: Int, maxReadErrorRate: Dou
|values two and three differ, the _more stringent value comes earlier_.
|
|In order to correctly filter reads in or out by template, if the input BAM is not `queryname` sorted or
|grouped it will be sorted into queryname order. The resulting records are `coordinate` sorted to efficiently
|correct the `NM`, `UQ` and `MD` tags, and the output BAM is always written in coordinate order.
|query grouped it will be sorted into queryname order prior to filtering.
|
|The output sort order may be specified with `--sort-order`. If not given, then the output will be in the same
|order as input if the input is queryname sorted or query grouped, otherwise queryname order.
|
|The `--reverse-tags-per-base` option controls whether per-base tags should be reversed before being used on reads
|marked as being mapped to the negative strand. This is necessary if the reads have been mapped and the
Expand All @@ -114,7 +120,9 @@ class FilterConsensusReads
@arg(flag='q', doc="The minimum mean base quality across the consensus read.")
val minMeanBaseQuality: Option[PhredScore] = None,
@arg(flag='s', doc="Mask (make `N`) consensus bases where the AB and BA consensus reads disagree (for duplex-sequencing only).")
val requireSingleStrandAgreement: Boolean = false
val requireSingleStrandAgreement: Boolean = false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on the tool usage, but you seemed to miss my top-level review comment last time, so I'm replicating it here:

Top level usage on FilterConsensusReads needs updating. The second to last paragraph talks about sorting, which is now incorrect. And it should mention the need to load the whole genome into memory in addition to potentially needing memory for sorting.

@arg(flag='S', doc="The sort order of the output. If not given, output will be in the same order as input if the input is query name sorted or query grouped, otherwise queryname order.")
val sortOrder: Option[SamOrder] = None
) extends FgBioTool with LazyLogging {
// Baseline input validation
Io.assertReadable(input)
Expand Down Expand Up @@ -169,12 +177,12 @@ class FilterConsensusReads
private val EmptyFilterResult = FilterResult(keepRead=true, maskedBases=0)

override def execute(): Unit = {
val in = SamSource(input)
val header = in.header.clone()
header.setSortOrder(SortOrder.coordinate)
val sorter = Bams.sorter(SamOrder.Coordinate, header, maxRecordsInRam=MaxRecordsInMemoryWhenSorting)
val out = SamWriter(output, header)
val progress1 = ProgressLogger(logger, verb="Filtered and masked")
logger.info("Reading the reference fasta into memory")
val refMap = ReferenceSequenceIterator(ref, stripComments=true).map { ref => ref.getContigIndex -> ref}.toMap
logger.info(f"Read ${refMap.size}%,d contigs.")
nh13 marked this conversation as resolved.
Show resolved Hide resolved

val in = SamSource(input)
val out = buildOutputWriter(in.header, refMap)

// Go through the reads by template and do the filtering
val templateIterator = Bams.templateIterator(in, maxInMemory=MaxRecordsInMemoryWhenSorting)
Expand All @@ -201,34 +209,82 @@ class FilterConsensusReads
keptReads += primaryReadCount
totalBases += r1.length + template.r2.map(_.length).getOrElse(0)
maskedBases += r1Result.maskedBases + r2Result.maskedBases
sorter += r1
progress1.record(r1)
template.r2.foreach { r => sorter += r; progress1.record(r) }
out += r1
template.r2.foreach { r => out += r }

template.allSupplementaryAndSecondary.foreach { r =>
val result = filterRecord(r)
if (result.keepRead) {
sorter += r
progress1.record(r)
out += r
}
}
}
}

// Then iterate the reads in coordinate order and re-calculate key tags
logger.info("Filtering complete; fixing tags and writing coordinate sorted reads.")
val progress2 = new ProgressLogger(logger, verb="Wrote")
val walker = new ReferenceSequenceFileWalker(ref.toFile)
sorter.foreach { rec =>
Bams.regenerateNmUqMdTags(rec, walker)
out += rec
progress2.record(rec)
}

logger.info("Finalizing the output")
in.safelyClose()
out.close()
logger.info(f"Output ${keptReads}%,d of ${totalReads}%,d primary consensus reads.")
logger.info(f"Masked ${maskedBases}%,d of ${totalBases}%,d bases in retained primary consensus reads.")
logger.info(f"Output $keptReads%,d of $totalReads%,d primary consensus reads.")
logger.info(f"Masked $maskedBases%,d of $totalBases%,d bases in retained primary consensus reads.")
Comment on lines +227 to +228
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you remove my {}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need it, and IntelliJ is making it a game to get rid of the warnings

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ... IntelliJ is not the boss of us. Put cursor into warning, option-enter, disable inspection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really do think that Intellij is wrong on this. Since braces become necessary the moment the expression becomes more than a simple variable reference (e.g. ${masedBases / totalBases}) and the fact that braces make it much clearer where the expression ends, I try to use them 100% of the time, and would certainly prefer they not get removed.

}

/** Builds the writer to which filtered records should be written.
*
* If the input order is [[SamOrder.Queryname]] or query grouped, then the filtered records will also be in the same
* order. So if the output order is specified AND does not match the the input order, sorting will occur.
*
* If the input order is not [[SamOrder.Queryname]] or query grouped, then the input records will be resorted into
* [[SamOrder.Queryname]]. So if the output order is specified AND is not [[SamOrder.Queryname]], sorting will occur.
*
* Otherwise, we can skip sorting!
*
* */
private def buildOutputWriter(inHeader: SAMFileHeader, refMap: Map[Int, ReferenceSequence]): Writer[SamRecord] with Closeable = {
val outHeader = inHeader.clone()

val inSortOrder = inHeader.getSortOrder
val inGroupOrder = inHeader.getGroupOrder
val inSubSort = Option(inHeader.getAttribute("SS"))

// Get the sort order, if any, to force the writer to resort.
val writerSortOrder = {
val canSkipSortingInput = inSortOrder == SortOrder.queryname || inGroupOrder == GroupOrder.query
(this.sortOrder, canSkipSortingInput) match {
case (None, false) =>
// already sorted to Queryname after filtering, so the output will be Queryname, and so no need to re-sort the output
SamOrder.Queryname.applyTo(outHeader)
None
case (Some(order), false) =>
// already sorted to Queryname after filtering, so don't sort the output if the given output sort order matches Queryname
order.applyTo(outHeader)
if (order != SamOrder.Queryname) Some(order) else None
case (None, true) =>
// input did not need to be re-sorted, so do not re-sort the output
None
case (Some(order), true) =>
// If sorting prior to filtering was not required, then the input was queryname or query grouped. So sort the
// output only if the specified output `sortOrder` does not match the input sort order.
if (order.sortOrder == inSortOrder && order.groupOrder == inGroupOrder && order.subSort == inSubSort) {
None // input matches the output order, no need to force sorting, keep the header
}
else { // mismatch in the input order and output order, force sorting the output
order.applyTo(outHeader)
Some(order)
}
}
}

val writer = SamWriter(output, outHeader, sort=writerSortOrder, maxRecordsInRam=MaxRecordsInMemoryWhenSorting)
writerSortOrder.foreach(o => logger.info(f"Output will be sorted into $o order"))

// Create the final writer based on if the full reference has been loaded, or not
new Writer[SamRecord] with Closeable {
override def write(rec: SamRecord): Unit = {
Bams.regenerateNmUqMdTags(rec, refMap(rec.refIndex))
writer += rec
}
def close(): Unit = writer.close()
}
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class BamsTest extends UnitSpec {
rec("NM") = 7
rec("MD") = "6A7C8T9G"
rec("UQ") = 237
Bams.regenerateNmUqMdTags(rec, DummyRefWalker)
Bams.regenerateNmUqMdTags(rec, DummyRefWalker.get(rec.refIndex))
rec.get[Int]("NM") shouldBe None
rec.get[String]("MD") shouldBe None
rec.get[Int]("UQ") shouldBe None
Expand All @@ -238,7 +238,7 @@ class BamsTest extends UnitSpec {
rec("MD") = "6A7C8T9G"
rec("UQ") = 237
rec.bases = "AAACAAAATA"
Bams.regenerateNmUqMdTags(rec, DummyRefWalker)
Bams.regenerateNmUqMdTags(rec, DummyRefWalker.get(rec.refIndex))
rec[Int]("NM") shouldBe 2
rec[String]("MD") shouldBe "3A4A1"
rec[Int]("UQ") shouldBe 40
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,67 @@ class FilterConsensusReadsTest extends UnitSpec {
}
}

private case class ReadNames(in: Seq[String], out: Seq[String])

private def sortOrderTest(name1: String, start1R1: Int, start1R2: Int, name2: String, start2R1: Int, start2R2: Int,
inOrder: SamOrder, outOrder: Option[SamOrder] = None): ReadNames = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that in order to test more combinations it would be better to take inSort and inGroup separately as SamOrder has a limited set of combinations.

val builder = new SamBuilder(readLength=10, baseQuality=45, sort=Some(inOrder))
builder.addPair(name=name1, start1=start1R1, start2=start1R2).foreach(r => tag(r, minDepth=4, depth=4, readErr=0f, depths=arr(4, 10), errors=arr(0,10)))
builder.addPair(name=name2, start1=start2R1, start2=start2R2).foreach(r => tag(r, minDepth=5, depth=5, readErr=0f, depths=arr(5, 10), errors=arr(0,10)))
val in = builder.toTempFile()
val out = makeTempFile("filtered.", ".bam")
new FilterConsensusReads(input=in, output=out, ref=ref, reversePerBaseTags=false,
minBaseQuality=45.toByte, minReads=Seq(3), maxReadErrorRate=Seq(0.025), maxBaseErrorRate=Seq(0.1), maxNoCallFraction=0.1,
sortOrder=outOrder
).execute()

val recs = SamSource(out).toSeq
recs.size shouldBe 4
recs.exists(_.basesString.contains("N")) shouldBe false
ReadNames(in=readBamRecs(in).map(_.name), out=recs.map(_.name))
}

it should "should output queryname sorted if the input is queryname sorted" in {
val result = sortOrderTest(
name1="q1", start1R1=101, start1R2=201,
name2="q2", start2R1=100, start2R2=200,
inOrder=SamOrder.Queryname
)
result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name!
result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name!
}

it should "should output query grouped sorted if the input is query grouped sorted" in {
val result = sortOrderTest(
name1="q2", start1R1=100, start1R2=200,
name2="q1", start2R1=101, start2R2=201,
inOrder=SamOrder.TemplateCoordinate
)
result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
result.out should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
}

it should "should output queryname sorted if the input is neither queryname nor query grouped sorted" in {
val result = sortOrderTest(
name1="q2", start1R1=100, start1R2=200,
name2="q1", start2R1=101, start2R2=201,
inOrder=SamOrder.Unsorted
)
result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name
}

it should "should output coordinate sorted if the output order is coordinate" in {
val result = sortOrderTest(
name1="q1", start1R1=100, start1R2=200,
name2="q2", start2R1=101, start2R2=201,
inOrder=SamOrder.Queryname,
outOrder=Some(SamOrder.Coordinate)
)
result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name
result.out should contain theSameElementsInOrderAs Seq("q1", "q2", "q1", "q2") // coordinate
}

//////////////////////////////////////////////////////////////////////////////
// Below this line are tests for filtering of duplex consensus reads.
//////////////////////////////////////////////////////////////////////////////
Expand Down