Skip to content

Commit

Permalink
[feature] Reduce memory usage by GroupReadsByUmi when not allowing di…
Browse files Browse the repository at this point in the history
…ffs between UMIs (#774)

Makes a change to GroupReadsByUmi to extend how it sorts reads to include the raw UMI sequence in cases where no differences are allowed between UMIs in grouping, then leverages that information to draw reads in in small chunks for grouping.  This can cause a dramatic reduction in memory usage when used with data that has a high frequency of the same start/stop position of reads (e.g. multiplex PCR) _and_ either `--edits 0` is specified or `--strategy Identity`.

Introduces a small functional change, only in the case where a) no diffs are allowed and b) variable length UMIs are used with `--min-umi-length`.  In this case the tool will occasionally do less trimming of UMIs than it did previously, and therefore possibly create incrementally more groups.
  • Loading branch information
tfenne authored Feb 20, 2022
1 parent 9c0aa78 commit 7176170
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 15 deletions.
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addDependencyTreePlugin

addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.10")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.4")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/fulcrumgenomics/umi/CorrectUmis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ class CorrectUmis
case Some(umi: String) =>
val sequences = umi.split('-')
if (sequences.exists(_.length != umiLength)) {
if (wrongLengthRecords == 0) logger.warning(s"Read (${rec.name}) detected with unexpected length UMI(s): ${sequences.mkString(" ")}")
if (wrongLengthRecords == 0) {
logger.warning(s"Read (${rec.name}) detected with unexpected length UMI(s): ${sequences.mkString(" ")}.")
logger.warning(s"Expected UMI length: ${umiLength}")
}
wrongLengthRecords += 1
rejectOut.foreach(w => w += rec)
}
Expand Down
92 changes: 81 additions & 11 deletions src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ object GroupReadsByUmi {
/** Take in a sequence of UMIs and assign each UMI to a unique UMI group ID. */
def assign(rawUmis: Seq[Umi]) : Map[Umi, MoleculeId]

/** Returns true if the two UMIs are the same. */
def isSameUmi(a: Umi, b: Umi): Boolean = a == b

/** Returns a canonical form of the UMI that is the same for all reads with the same UMI. */
def canonicalize(u: Umi): Umi = u

/** Default implementation of a method to retrieve the next ID based on a counter. */
protected def nextId: MoleculeId = this.counter.getAndIncrement().toString

Expand Down Expand Up @@ -259,13 +265,36 @@ object GroupReadsByUmi {
/** String that is prefixed onto the UMI from the read with that maps to a higher coordinate in the genome.. */
private[umi] val higherReadUmiPrefix: String = ("b" * (maxMismatches+1)) + ":"


/** Returns true if the two UMIs are the same. */
final override def isSameUmi(a: Umi, b: Umi): Boolean = {
if (a == b) true else {
// same as `a == reverse(b)` but more efficient than creating new strings
val (a1, a2) = split(a)
val (b1, b2) = split(b)
a1 == b2 && a2 == b1
}
}

/** Returns the UMI with the lexically lower half first. */
override def canonicalize(u: Umi): Umi = {
val (a, b) = split(u)
if (a < b) u else s"${b}-${a}"
}

/** Splits the paired UMI into its two parts. */
@inline private def split(umi: Umi): (Umi, Umi) = {
val index = umi.indexOf('-')
if (index == -1) throw new IllegalStateException(s"UMI $umi is not a paired UMI.")
val first = umi.substring(0, index)
val second = umi.substring(index+1, umi.length)
(first, second)
}

/** Takes a UMI of the form "A-B" and returns "B-A". */
def reverse(umi: Umi): Umi = umi.indexOf('-') match {
case -1 => throw new IllegalStateException(s"UMI $umi is not a paired UMI.")
case i =>
val first = umi.substring(0, i)
val second = umi.substring(i+1, umi.length)
second + '-' + first
def reverse(umi: Umi): Umi = {
val (first, second) = split(umi)
s"${second}-${first}"
}

/** Turns each UMI into the lexically earlier of A-B or B-A and then counts them. */
Expand Down Expand Up @@ -441,6 +470,17 @@ class GroupReadsByUmi

private val assigner = strategy.newStrategy(this.edits)

/** True if no differences in UMIs are tolerated and the Molecular ID tag is MI, false otherwise. True here enables
* an optimization where, when bringing groups of reads into memory, we can _also_ group by UMI thus
* reducing the number of reads in memory. This is helpful since edits=0 is often used for data that has
* high numbers of reads with the same start/stop coordinates.
* We do this by setting the MI tag to the canonicalized (optionally truncated) UMI prior to sorting, so that
* reads with the same UMI are grouped together in the sorted stream of records.
*/
private val canTakeNextGroupByUmi =
(this.assignTag == ConsensusTags.MolecularId) &&
(this.edits == 0 || this.strategy == Strategy.Identity)

/** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */
private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = {
val mateMqOk = if (rec.unpaired) true else rec.get[Int](SAMTag.MQ.name()) match {
Expand Down Expand Up @@ -480,7 +520,26 @@ class GroupReadsByUmi
}
} || { filterUmisTooShort += 1; false}
}
.foreach(r => { sorter += r; kept += 1; sortProgress.record(r) })
.foreach { r =>
// If we're able to also group by the UMI because edits aren't allowed, push the trimmed, canonicalized UMI
// into the assign tag (which must be MI if canTakeNextGroupByUmi is true), since that is used by the
// SamOrder to sort the reads _and_ we'll overwrite it on the way out!
// Note that here we trim UMIs (if enabled) to the minimum UMI length for sorting, but that when doing the
// actual grouping later we go back to the raw tag (RX) and use as much of the UMI as possible.
if (this.canTakeNextGroupByUmi) {
val umi = this.assigner.canonicalize(r[String](rawTag).toUpperCase)
val truncated = this.minUmiLength match {
case None => umi
case Some(n) => umi.substring(0, n)
}

r(this.assignTag) = truncated
}

sorter += r
kept += 1
sortProgress.record(r)
}

logger.info(f"Accepted $kept%,d reads for grouping.")
if (filteredNonPf > 0) logger.info(f"Filtered out $filteredNonPf%,d non-PF reads.")
Expand Down Expand Up @@ -531,11 +590,22 @@ class GroupReadsByUmi

/** Consumes the next group of templates with all matching end positions and returns them. */
def takeNextGroup(iterator: BufferedIterator[Template]) : Seq[Template] = {
val first = iterator.next()
val first = iterator.next()
val firstEnds = ReadInfo(first.r1.getOrElse(fail(s"R1 missing for template ${first.name}")))
val buffer = ListBuffer[Template]()
while (iterator.hasNext && firstEnds == ReadInfo(iterator.head.r1.get)) buffer += iterator.next()
first :: buffer.toList
val firstUmi = first.r1.get.apply[String](this.assignTag)
val builder = IndexedSeq.newBuilder[Template]
builder += first

while (
iterator.hasNext &&
firstEnds == ReadInfo(iterator.head.r1.get) &&
// This last condition only works because we put a canonicalized UMI into rec(assignTag) if canTakeNextGroupByUmi
(!canTakeNextGroupByUmi || firstUmi == iterator.head.r1.get.apply[String](this.assignTag))
) {
builder += iterator.next()
}

builder.result()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=Strategy.Identity, edits=0, minUmiLength=Some(6)).execute()
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=strategy, edits=0, minUmiLength=Some(6)).execute()

val recs = readBamRecs(out)
recs should have length 2
Expand All @@ -363,7 +363,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=Strategy.Identity, edits=0, minUmiLength=Some(5)).execute()
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=strategy, edits=0, minUmiLength=Some(5)).execute()

val recs = readBamRecs(out)
recs should have length 4
Expand Down

0 comments on commit 7176170

Please sign in to comment.