Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specify an output sort order in FilterConsensusReads #782

Merged
merged 7 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/main/scala/com/fulcrumgenomics/bam/Bams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter}
import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder}
import htsjdk.samtools.SamPairUtil.PairOrientation
import htsjdk.samtools._
import htsjdk.samtools.reference.ReferenceSequenceFileWalker
import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFileWalker}
import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil}

import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.math.{max, min}

/**
Expand Down Expand Up @@ -246,7 +245,7 @@ object Bams extends LazyLogging {
case (Some(Queryname), _) => new SelfClosingIterator(iterator.bufferBetter, () => CloserUtil.close(iterator))
case (_, _) =>
logger.info(parts = "Sorting into queryname order.")
val progress = ProgressLogger(this.logger, "Records", "sorted")
val progress = ProgressLogger(this.logger, "records", "sorted")
val sort = sorter(Queryname, header, maxInMemory, tmpDir)
iterator.foreach { rec =>
progress.record(rec)
Expand Down Expand Up @@ -412,20 +411,38 @@ object Bams extends LazyLogging {
* @param rec the SamRecord to update
* @param ref a reference sequence file walker to pull the reference information from
*/
def regenerateNmUqMdTags(rec: SamRecord, ref: ReferenceSequenceFileWalker): Unit = {
def regenerateNmUqMdTags(rec: SamRecord, ref: ReferenceSequenceFileWalker): SamRecord = {
if (rec.unmapped) regenerateNmUqMdTags(rec, Map.empty[Int, ReferenceSequence]) else {
val refSeq = ref.get(rec.refIndex)
regenerateNmUqMdTags(rec, Map(refSeq.getContigIndex -> refSeq))
}
rec
}

/**
* Ensures that any NM/UQ/MD tags on the read are accurate. If the read is unmapped, any existing
* values are removed. If the read is mapped all three tags will have values regenerated.
*
* @param rec the SamRecord to update
* @param ref a reference sequence file walker to pull the reference information from
*/
def regenerateNmUqMdTags(rec: SamRecord, ref: Map[Int, ReferenceSequence]): SamRecord = {
nh13 marked this conversation as resolved.
Show resolved Hide resolved
import SAMTag._
if (rec.unmapped) {
rec(NM.name()) = null
rec(UQ.name()) = null
rec(MD.name()) = null
}
else {
val refBases = ref.get(rec.refIndex).getBases
val refBases = ref.getOrElse(rec.refIndex, throw new IllegalArgumentException(
s"Record '${rec.name}' had contig index '${rec.refIndex}', but not found in the input reference map"
)).getBases
SequenceUtil.calculateMdAndNmTags(rec.asSam, refBases, true, true)
if (rec.quals != null && rec.quals.length != 0) {
rec(SAMTag.UQ.name) = SequenceUtil.sumQualitiesOfMismatches(rec.asSam, refBases, 0)
}
}
rec
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ package com.fulcrumgenomics.fasta

import htsjdk.samtools.reference.{ReferenceSequence, ReferenceSequenceFile, ReferenceSequenceFileFactory}
import com.fulcrumgenomics.commons.CommonsDef._
import com.fulcrumgenomics.commons.collection.SelfClosingIterator

object ReferenceSequenceIterator {
/** Constructs an iterator over a reference sequence from a Path to the FASTA file. */
def apply(path: PathToFasta, stripComments: Boolean = false) = {
def apply(path: PathToFasta, stripComments: Boolean = false): ReferenceSequenceIterator = {
new ReferenceSequenceIterator(ReferenceSequenceFileFactory.getReferenceSequenceFile(path, stripComments, false))
}

/** Constructs an iterator over a reference sequence from a ReferenceSequenceFile. */
def apply(ref: ReferenceSequenceFile) = {
def apply(ref: ReferenceSequenceFile): ReferenceSequenceIterator = {
new ReferenceSequenceIterator(ref)
}
}
Expand All @@ -45,8 +46,15 @@ object ReferenceSequenceIterator {
class ReferenceSequenceIterator private(private val ref: ReferenceSequenceFile) extends Iterator[ReferenceSequence] {
// The next reference sequence; will be null if there is no next :(
private var nextSequence: ReferenceSequence = ref.nextSequence()
private var isOpen: Boolean = true

override def hasNext: Boolean = this.nextSequence != null
override def hasNext: Boolean = if (this.nextSequence != null) true else {
if (isOpen) {
isOpen = false
ref.close()
}
false
}

override def next(): ReferenceSequence = {
assert(hasNext, "next() called on empty iterator")
Expand Down
92 changes: 64 additions & 28 deletions src/main/scala/com/fulcrumgenomics/umi/FilterConsensusReads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,24 @@

package com.fulcrumgenomics.umi

import java.lang.Math.{max, min}

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.Bams
import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.io.Writer
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.fasta.ReferenceSequenceIterator
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.util.NumericTypes.PhredScore
import com.fulcrumgenomics.util.{Io, ProgressLogger}
import htsjdk.samtools.SAMFileHeader.SortOrder
import htsjdk.samtools.reference.ReferenceSequenceFileWalker
import htsjdk.samtools.SAMFileHeader
import htsjdk.samtools.SAMFileHeader.GroupOrder
import htsjdk.samtools.reference.ReferenceSequence
import htsjdk.samtools.util.SequenceUtil

import java.io.Closeable
import java.lang.Math.{max, min}

/** Filter values for filtering consensus reads */
private[umi] case class ConsensusReadFilter(minReads: Int, maxReadErrorRate: Double, maxBaseErrorRate: Double)

Expand Down Expand Up @@ -114,7 +118,9 @@ class FilterConsensusReads
@arg(flag='q', doc="The minimum mean base quality across the consensus read.")
val minMeanBaseQuality: Option[PhredScore] = None,
@arg(flag='s', doc="Mask (make `N`) consensus bases where the AB and BA consensus reads disagree (for duplex-sequencing only).")
val requireSingleStrandAgreement: Boolean = false
val requireSingleStrandAgreement: Boolean = false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on the tool usage, but you seemed to miss my top-level review comment last time, so I'm replicating it here:

Top level usage on FilterConsensusReads needs updating. The second to last paragraph talks about sorting, which is now incorrect. And it should mention the need to load the whole genome into memory in addition to potentially needing memory for sorting.

@arg(flag='S', doc="The sort order of the output. If not given, query grouped if the input is also query grouped, otherwise queryname.")
nh13 marked this conversation as resolved.
Show resolved Hide resolved
val sortOrder: Option[SamOrder] = None
) extends FgBioTool with LazyLogging {
// Baseline input validation
Io.assertReadable(input)
Expand Down Expand Up @@ -169,12 +175,13 @@ class FilterConsensusReads
private val EmptyFilterResult = FilterResult(keepRead=true, maskedBases=0)

override def execute(): Unit = {
val in = SamSource(input)
val header = in.header.clone()
header.setSortOrder(SortOrder.coordinate)
val sorter = Bams.sorter(SamOrder.Coordinate, header, maxRecordsInRam=MaxRecordsInMemoryWhenSorting)
val out = SamWriter(output, header)
val progress1 = ProgressLogger(logger, verb="Filtered and masked")
logger.info("Reading the reference into memory")
nh13 marked this conversation as resolved.
Show resolved Hide resolved
val refMap = ReferenceSequenceIterator(ref, stripComments=true).map { ref => ref.getContigIndex -> ref}.toMap
logger.info(f"Read ${refMap.size}%,d contigs.")
nh13 marked this conversation as resolved.
Show resolved Hide resolved

val progress = ProgressLogger(logger, verb="Filtered and masked")
val in = SamSource(input)
val out = buildOutputWriter(in.header, refMap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on the line below, but I wonder if in Bams.templateIterator (or rather Bams.queryGroupedIterator that is calls) we should think about trying to auto-detect query-grouping by looking at say the first 100 records if the sort order and group order are both null? Could be a separate PR, but would be nice to avoid sorting more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this tool though it might be nice to test for query grouped or sorted and emit a warning that it is preferable to supply this tool query-grouped data and defer sorting until after?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving it as is. See #799


// Go through the reads by template and do the filtering
val templateIterator = Bams.templateIterator(in, maxInMemory=MaxRecordsInMemoryWhenSorting)
Expand All @@ -201,34 +208,63 @@ class FilterConsensusReads
keptReads += primaryReadCount
totalBases += r1.length + template.r2.map(_.length).getOrElse(0)
maskedBases += r1Result.maskedBases + r2Result.maskedBases
sorter += r1
progress1.record(r1)
template.r2.foreach { r => sorter += r; progress1.record(r) }
out += r1
progress.record(r1)
template.r2.foreach { r => out += r; progress.record(r) }
nh13 marked this conversation as resolved.
Show resolved Hide resolved

template.allSupplementaryAndSecondary.foreach { r =>
val result = filterRecord(r)
if (result.keepRead) {
sorter += r
progress1.record(r)
out += r
progress.record(r)
}
}
}
}
progress.logLast()

// Then iterate the reads in coordinate order and re-calculate key tags
logger.info("Filtering complete; fixing tags and writing coordinate sorted reads.")
val progress2 = new ProgressLogger(logger, verb="Wrote")
val walker = new ReferenceSequenceFileWalker(ref.toFile)
sorter.foreach { rec =>
Bams.regenerateNmUqMdTags(rec, walker)
out += rec
progress2.record(rec)
}

logger.info("Finalizing the output")
in.safelyClose()
out.close()
logger.info(f"Output ${keptReads}%,d of ${totalReads}%,d primary consensus reads.")
logger.info(f"Masked ${maskedBases}%,d of ${totalBases}%,d bases in retained primary consensus reads.")
logger.info(f"Output $keptReads%,d of $totalReads%,d primary consensus reads.")
logger.info(f"Masked $maskedBases%,d of $totalBases%,d bases in retained primary consensus reads.")
Comment on lines +227 to +228
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you remove my {}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't need it, and IntelliJ is making it a game to get rid of the warnings

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ... IntelliJ is not the boss of us. Put cursor into warning, option-enter, disable inspection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really do think that Intellij is wrong on this. Since braces become necessary the moment the expression becomes more than a simple variable reference (e.g. ${masedBases / totalBases}) and the fact that braces make it much clearer where the expression ends, I try to use them 100% of the time, and would certainly prefer they not get removed.

}

/** Builds the writer to which filtered records should be written.
*
* If the input order is [[SamOrder.Queryname]] or query grouped, then the filtered records will also be in the same
* order. So if the output order specified AND does not match the the input order, sorting will occur.
nh13 marked this conversation as resolved.
Show resolved Hide resolved
*
* If the input order is not [[SamOrder.Queryname]] or query grouped, then the input records will be resorted into
* [[SamOrder.Queryname]]. So if the output order specified AND is not [[SamOrder.Queryname]], sorting will occur.
nh13 marked this conversation as resolved.
Show resolved Hide resolved
*
* Otherwise, we can skip sorting!
*
* */
private def buildOutputWriter(inHeader: SAMFileHeader, refMap: Map[Int, ReferenceSequence]): Closeable with Writer[SamRecord] = {
nh13 marked this conversation as resolved.
Show resolved Hide resolved
val outHeader = inHeader.clone()

// Check if the input will be re-sorted into QueryName, or if the input sort order will be kept
val orderAfterFiltering = SamOrder(inHeader) match {
case Some(order) if order == SamOrder.Queryname || order.groupOrder == GroupOrder.query => order
case _ => SamOrder.Queryname // input will we resorted to queryname
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work as intended in all cases. Because we don't have SamOrder objects that have e.g. sort=unknown, group=query, it won't find a SamOrder if the header has that combination, and so this will fall to the default branch and assume sorting to queryname was done.

I think it's safer and simpler to just check if (inHeader.getSortOrder == SortOrder.queryname || inHeader.getGroupOrder == GroupOrder.queryname). And possibly we should extract this check into Bams or elsewhere so it's the same check that is used in Bams.templateIterator?

Or for extra credit, we could make a new iterator type in Bams that has a flag that just tells you whether the data got sorted or not to generate your iterator.


// Get the output order
val outputOrder = this.sortOrder
.getOrElse(orderAfterFiltering) // use the order after filtering, so no sort occurs
outputOrder.applyTo(outHeader) // remember to apply it

// Build the writer
val sort = if (orderAfterFiltering == outputOrder) None else Some(outputOrder)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will have to become a slightly more complicated check

val writer = SamWriter(output, outHeader, sort=sort, maxRecordsInRam=MaxRecordsInMemoryWhenSorting)
sort.foreach(o => logger.info(f"Output will be sorted into $o order"))

// Create the final writer based on if the full reference has been loaded, or not
new Writer[SamRecord] with Closeable {
override def write(rec: SamRecord): Unit = writer += Bams.regenerateNmUqMdTags(rec, refMap)
nh13 marked this conversation as resolved.
Show resolved Hide resolved
def close(): Unit = writer.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,67 @@ class FilterConsensusReadsTest extends UnitSpec {
}
}

private case class ReadNames(in: Seq[String], out: Seq[String])

private def sortOrderTest(name1: String, start1R1: Int, start1R2: Int, name2: String, start2R1: Int, start2R2: Int,
inOrder: SamOrder, outOrder: Option[SamOrder] = None): ReadNames = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that in order to test more combinations it would be better to take inSort and inGroup separately as SamOrder has a limited set of combinations.

val builder = new SamBuilder(readLength=10, baseQuality=45, sort=Some(inOrder))
builder.addPair(name=name1, start1=start1R1, start2=start1R2).foreach(r => tag(r, minDepth=4, depth=4, readErr=0f, depths=arr(4, 10), errors=arr(0,10)))
builder.addPair(name=name2, start1=start2R1, start2=start2R2).foreach(r => tag(r, minDepth=5, depth=5, readErr=0f, depths=arr(5, 10), errors=arr(0,10)))
val in = builder.toTempFile()
val out = makeTempFile("filtered.", ".bam")
new FilterConsensusReads(input=in, output=out, ref=ref, reversePerBaseTags=false,
minBaseQuality=45.toByte, minReads=Seq(3), maxReadErrorRate=Seq(0.025), maxBaseErrorRate=Seq(0.1), maxNoCallFraction=0.1,
sortOrder=outOrder
).execute()

val recs = SamSource(out).toSeq
recs.size shouldBe 4
recs.exists(_.basesString.contains("N")) shouldBe false
ReadNames(in=readBamRecs(in).map(_.name), out=recs.map(_.name))
}

it should "should output queryname sorted if the input is queryname sorted" in {
val result = sortOrderTest(
name1="q1", start1R1=101, start1R2=201,
name2="q2", start2R1=100, start2R2=200,
inOrder=SamOrder.Queryname
)
result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name!
result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name!
}

it should "should output query grouped sorted if the input is query grouped sorted" in {
val result = sortOrderTest(
name1="q2", start1R1=100, start1R2=200,
name2="q1", start2R1=101, start2R2=201,
inOrder=SamOrder.TemplateCoordinate
)
result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
result.out should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
}

it should "should output queryname sorted if the input is neither queryname nor query grouped sorted" in {
val result = sortOrderTest(
name1="q2", start1R1=100, start1R2=200,
name2="q1", start2R1=101, start2R2=201,
inOrder=SamOrder.Unsorted
)
result.in should contain theSameElementsInOrderAs Seq("q2", "q2", "q1", "q1") // query grouped, but not query name
result.out should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name
}

it should "should output coordinate sorted if the output order is coordinate" in {
val result = sortOrderTest(
name1="q1", start1R1=100, start1R2=200,
name2="q2", start2R1=101, start2R2=201,
inOrder=SamOrder.Queryname,
outOrder=Some(SamOrder.Coordinate)
)
result.in should contain theSameElementsInOrderAs Seq("q1", "q1", "q2", "q2") // query name
result.out should contain theSameElementsInOrderAs Seq("q1", "q2", "q1", "q2") // coordinate
}

//////////////////////////////////////////////////////////////////////////////
// Below this line are tests for filtering of duplex consensus reads.
//////////////////////////////////////////////////////////////////////////////
Expand Down