Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory usage by GroupReadsByUmi in a corner case #774

Merged
merged 6 commits into from
Feb 20, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ addDependencyTreePlugin

addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.10")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.15.0")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.4")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.0.1")
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/com/fulcrumgenomics/umi/CorrectUmis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ class CorrectUmis
case Some(umi: String) =>
val sequences = umi.split('-')
if (sequences.exists(_.length != umiLength)) {
if (wrongLengthRecords == 0) logger.warning(s"Read (${rec.name}) detected with unexpected length UMI(s): ${sequences.mkString(" ")}")
if (wrongLengthRecords == 0) {
logger.warning(s"Read (${rec.name}) detected with unexpected length UMI(s): ${sequences.mkString(" ")}.")
logger.warning(s"Expected UMI length: ${umiLength}")
Comment on lines +189 to +190
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the unrelated change here. I had a stupid type that took me far too long to figure out because I used -u instead of -U and CorrectUmis happily decided my filename was the sole UMI sequence to correct to. It the message here had told me my expcted UMI length was 30+ that would have helped!

}
wrongLengthRecords += 1
rejectOut.foreach(w => w += rec)
}
Expand Down
89 changes: 78 additions & 11 deletions src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ object GroupReadsByUmi {
/** Take in a sequence of UMIs and assign each UMI to a unique UMI group ID. */
def assign(rawUmis: Seq[Umi]) : Map[Umi, MoleculeId]

/** Returns true if the two UMIs are the same. */
def isSameUmi(a: Umi, b: Umi): Boolean = a == b

/** Returns a canonical form of the UMI that is the same for all reads with the same UMI. */
def canonicalize(u: Umi): Umi = u

/** Default implementation of a method to retrieve the next ID based on a counter. */
protected def nextId: MoleculeId = this.counter.getAndIncrement().toString

Expand Down Expand Up @@ -259,13 +265,35 @@ object GroupReadsByUmi {
/** String that is prefixed onto the UMI from the read with that maps to a higher coordinate in the genome.. */
private[umi] val higherReadUmiPrefix: String = ("b" * (maxMismatches+1)) + ":"


/** Returns true if the two UMIs are the same. */
final override def isSameUmi(a: Umi, b: Umi): Boolean = {
if (a == b) true else {
val (a1, a2) = split(a)
tfenne marked this conversation as resolved.
Show resolved Hide resolved
val (b1, b2) = split(b)
a1 == b2 && a2 == b1
}
}

/** Returns the UMI with the lexically lower half first. */
override def canonicalize(u: Umi): Umi = {
val (a, b) = split(u)
if (a < b) u else s"${b}-${a}"
}

/** Splits the paired UMI into it's two parts. */
tfenne marked this conversation as resolved.
Show resolved Hide resolved
@inline private def split(umi: Umi): (Umi, Umi) = {
val index = umi.indexOf('-')
if (index == -1) throw new IllegalStateException(s"UMI $umi is not a paired UMI.")
val first = umi.substring(0, index)
val second = umi.substring(index+1, umi.length)
(first, second)
}

/** Takes a UMI of the form "A-B" and returns "B-A". */
def reverse(umi: Umi): Umi = umi.indexOf('-') match {
case -1 => throw new IllegalStateException(s"UMI $umi is not a paired UMI.")
case i =>
val first = umi.substring(0, i)
val second = umi.substring(i+1, umi.length)
second + '-' + first
def reverse(umi: Umi): Umi = {
val (first, second) = split(umi)
s"${second}-${first}"
}

/** Turns each UMI into the lexically earlier of A-B or B-A and then counts them. */
Expand Down Expand Up @@ -441,6 +469,17 @@ class GroupReadsByUmi

private val assigner = strategy.newStrategy(this.edits)

/** True if no differences in UMIs are tolerated and the Molecular ID tag is MI, false otherwise. True here enables
* an optimization where, when bringing groups of reads into memory, we can _also_ group by UMI thus
* reducing the number in memory. This is helpful since edits=0 is often used for data that has
tfenne marked this conversation as resolved.
Show resolved Hide resolved
* high numbers of reads with the same start/stop coordinates.
* We do this be setting the MI tag to the canonicalized, (optionally truncated) UMI prior to sorting, so that
tfenne marked this conversation as resolved.
Show resolved Hide resolved
* reads with the same UMI are grouped together in the sorted stream of records.
*/
private val canTakeNextGroupByUmi =
(this.assignTag == ConsensusTags.MolecularId) &&
(this.edits == 0 || this.strategy == Strategy.Identity)

/** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */
private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = {
val mateMqOk = if (rec.unpaired) true else rec.get[Int](SAMTag.MQ.name()) match {
Expand Down Expand Up @@ -480,7 +519,24 @@ class GroupReadsByUmi
}
} || { filterUmisTooShort += 1; false}
}
.foreach(r => { sorter += r; kept += 1; sortProgress.record(r) })
.foreach { r =>
// If we're able to also group by the UMI because edits aren't allowed, push the trimmed, canonicalized UMI
// into the assign tag (which must be MI if cantakeNextGroupByUmi is true), since that is used by the
// SamOrder to sort the reads _and_ we'll overwrite it on the way out!
tfenne marked this conversation as resolved.
Show resolved Hide resolved
if (this.canTakeNextGroupByUmi) {
val umi = this.assigner.canonicalize(r[String](rawTag).toUpperCase)
val truncated = this.minUmiLength match {
case None => umi
case Some(n) => umi.substring(0, n)
}

r(this.assignTag) = truncated
}

sorter += r
kept += 1
sortProgress.record(r)
}

logger.info(f"Accepted $kept%,d reads for grouping.")
if (filteredNonPf > 0) logger.info(f"Filtered out $filteredNonPf%,d non-PF reads.")
Expand Down Expand Up @@ -531,11 +587,22 @@ class GroupReadsByUmi

/** Consumes the next group of templates with all matching end positions and returns them. */
def takeNextGroup(iterator: BufferedIterator[Template]) : Seq[Template] = {
val first = iterator.next()
val first = iterator.next()
val firstEnds = ReadInfo(first.r1.getOrElse(fail(s"R1 missing for template ${first.name}")))
val buffer = ListBuffer[Template]()
while (iterator.hasNext && firstEnds == ReadInfo(iterator.head.r1.get)) buffer += iterator.next()
first :: buffer.toList
val firstUmi = first.r1.get.apply[String](this.assignTag)
val builder = IndexedSeq.newBuilder[Template]
builder += first

while (
iterator.hasNext &&
firstEnds == ReadInfo(iterator.head.r1.get) &&
// This last condition only works because we put a canonicalized UMI into rec(assignTag) if canTakeNextGroupByUmi
(!canTakeNextGroupByUmi || firstUmi == iterator.head.r1.get.apply[String](this.assignTag))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this does change behavior in a subtle way. Suppose we have three templates that have all but the same assign tag. Let's say AAAA, GGGGG, GGGGT, with --min-umi-length=3.

Previously, all templates would be read into memory, and truncateUmis would truncate to the length of smallest UMI observed in the group of templates, in this case 4bp long due to the AAAA (not 3 as per the command line!). So the three templates would have UMIs AAAA->AAAA, GGGGG->GGGG, and GGGGT->GGGG. So we'd assign two unique molecules (AAAA and GGGG, with the last molecule containing the last two reads).

In the new implementation, we truncate the raw UMI bases based on --min-umi-length=3 to set MI for sorting. So we'd truncate to length 3 for sorting: AAAA->AAA, GGG->GGG, and GGG->GGG. When we read back in after sorting, we read in the first template by itself (only read with MI:AAA), so no truncation is applied and it stays the same (AAAA). We then read in all templates with MI having GGG, which gives the second two templates. Now we go back to the raw UMIs to find the length of the shortest UMI of the two. Both are 5bp long, so we do not truncate and keep them the same (GGGGG and GGGGT). But now these UMIs differ, so we get three molecules!

One could argue that the new implementation is an improvement, but it does change behavior subtly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrm, that is an interesting point. So basically if you have variable length single UMIs and have edits = 0 the behavior will be subtly different. My instinct is to call this an improvement and move on, but I don't have a great sense of who (or on what kind of data) the variable length support is used, so I'm not 100% sure.

) {
builder += iterator.next()
}

builder.result()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=Strategy.Identity, edits=0, minUmiLength=Some(6)).execute()
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=strategy, edits=0, minUmiLength=Some(6)).execute()

val recs = readBamRecs(out)
recs should have length 2
Expand All @@ -363,7 +363,7 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=Strategy.Identity, edits=0, minUmiLength=Some(5)).execute()
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=None, rawTag="RX", assignTag="MI", strategy=strategy, edits=0, minUmiLength=Some(5)).execute()

val recs = readBamRecs(out)
recs should have length 4
Expand Down