Skip to content

Commit

Permalink
GroupReadsByUmi only sort input if it is not TemplateCoordinate sorted
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 committed Mar 3, 2022
1 parent 6e0a53d commit 86e8980
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 74 deletions.
118 changes: 73 additions & 45 deletions src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
package com.fulcrumgenomics.umi

import java.util.concurrent.atomic.AtomicLong

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.api.SamOrder.TemplateCoordinate
import com.fulcrumgenomics.bam.{Bams, Template}
import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
Expand Down Expand Up @@ -102,8 +102,8 @@ object GroupReadsByUmi {

val r1Earlier =
(ref1 < ref2) ||
(ref1 == ref2 && start1 < start2) ||
(ref1 == ref2 && start1 == start2 && strand1 < strand2)
(ref1 == ref2 && start1 < start2) ||
(ref1 == ref2 && start1 == start2 && strand1 < strand2)

if (r1Earlier) new ReadInfo(ref1, start1, strand1, ref2, start2, strand2, lib)
else new ReadInfo(ref2, start2, strand2, ref1, start1, strand1, lib)
Expand Down Expand Up @@ -381,7 +381,7 @@ object GroupReadsByUmi {
* the main assignment method to filter out the ones that shouldn't be in the Map.
*/
override def assign(rawUmis: Seq[Umi]): Map[Umi, MoleculeId] = {
super.assign(rawUmis).filter { case (umi, id) => rawUmis.contains(umi) }
super.assign(rawUmis).filter { case (umi, _) => rawUmis.contains(umi) }
}
}
}
Expand Down Expand Up @@ -469,6 +469,9 @@ object Strategy extends FgBioEnum[Strategy] {
|compared, where `len` is the length of the shortest UMI. The UMI length is the number of [ACGT] bases in the UMI
|(i.e. does not count dashes and other non-ACGT characters). This option is not implemented for reads with UMI pairs
|(i.e. using the paired assigner).
|
|If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then
|this tool will re-sort the input. The ouitput will be written in template-coordinate order.
"""
)
class GroupReadsByUmi
Expand Down Expand Up @@ -496,16 +499,6 @@ class GroupReadsByUmi

private val assigner = strategy.newStrategy(this.edits)

/** True if no differences in UMIs are tolerated and the Molecular ID tag is MI, false otherwise. True here enables
* an optimization where, when bringing groups of reads into memory, we can _also_ group by UMI thus
* reducing the number of reads in memory. This is helpful since edits=0 is often used for data that has
* high numbers of reads with the same start/stop coordinates.
* We do this by setting the MI tag to the canonicalized (optionally truncated) UMI prior to sorting, so that
* reads with the same UMI are grouped together in the sorted stream of records.
*/
private val canTakeNextGroupByUmi =
(this.assignTag == ConsensusTags.MolecularId) &&
(this.edits == 0 || this.strategy == Strategy.Identity)

/** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */
private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = {
Expand All @@ -523,22 +516,36 @@ class GroupReadsByUmi
}
}

// A handful of counters for tracking reads
private var (filteredNonPf, filteredPoorAlignment, filteredNsInUmi, filterUmisTooShort, kept) = (0L, 0L, 0L, 0L, 0L)

override def execute(): Unit = {
Io.assertReadable(input)
Io.assertCanWriteFile(output)
this.familySizeHistogram.foreach(f => Io.assertCanWriteFile(f))

val in = SamSource(input)
val header = in.header
val sorter = Bams.sorter(SamOrder.TemplateCoordinate, header)
val sortProgress = ProgressLogger(logger, verb="Sorted")

// A handful of counters for tracking reads
var (filteredNonPf, filteredPoorAlignment, filteredNsInUmi, filterUmisTooShort, kept) = (0L, 0L, 0L, 0L, 0L)
val in = SamSource(input)
val header = in.header
val skipSorting = SamOrder(header).contains(TemplateCoordinate)

// True if the input will be sorted and no differences in UMIs are tolerated and the Molecular ID tag is MI,
// false otherwise. Pre-sorted (TemplateCoordinate sorted) input does not enable this optimization as we rely on
// copying the `RX` tag into the `MI` tag so that same raw UMIs are grouped together in the sorted stream of records.
//
// True here enables an optimization where, when bringing groups of reads into memory, we can _also_ group by UMI
// thus reducing the number of reads in memory. This is helpful since edits=0 is often used for data that has
// high numbers of reads with the same start/stop coordinates.
// We do this by setting the MI tag to the canonicalized (optionally truncated) UMI prior to sorting, so that
// reads with the same UMI are grouped together in the sorted stream of records.
val canTakeNextGroupByUmi = {
!skipSorting &&
(this.assignTag == ConsensusTags.MolecularId) &&
(this.edits == 0 || this.strategy == Strategy.Identity)
}

// Filter and sort the input BAM file
logger.info("Filtering and sorting input.")
in.iterator
logger.info("Filtering the input.")
val filteredIterator = in.iterator
.filter(r => !r.secondary && !r.supplementary)
.filter(r => (includeNonPfReads || r.pf) || { filteredNonPf += 1; false })
.filter(r => (r.mapped || (r.paired && r.mateMapped)) || { filteredPoorAlignment += 1; false })
Expand All @@ -552,13 +559,13 @@ class GroupReadsByUmi
}
} || { filterUmisTooShort += 1; false}
}
.foreach { r =>
.tapEach { r =>
// If we're able to also group by the UMI because edits aren't allowed, push the trimmed, canonicalized UMI
// into the assign tag (which must be MI if canTakeNextGroupByUmi is true), since that is used by the
// SamOrder to sort the reads _and_ we'll overwrite it on the way out!
// Note that here we trim UMIs (if enabled) to the minimum UMI length for sorting, but that when doing the
// actual grouping later we go back to the raw tag (RX) and use as much of the UMI as possible.
if (this.canTakeNextGroupByUmi) {
if (canTakeNextGroupByUmi) {
val umi = this.assigner.canonicalize(r[String](rawTag).toUpperCase)
val truncated = this.minUmiLength match {
case None => umi
Expand All @@ -568,33 +575,44 @@ class GroupReadsByUmi
r(this.assignTag) = truncated
}

sorter += r
kept += 1
sortProgress.record(r)
r
}

logger.info(f"Accepted $kept%,d reads for grouping.")
if (filteredNonPf > 0) logger.info(f"Filtered out $filteredNonPf%,d non-PF reads.")
logger.info(f"Filtered out $filteredPoorAlignment%,d reads due to mapping issues.")
logger.info(f"Filtered out $filteredNsInUmi%,d reads that contained one or more Ns in their UMIs.")
this.minUmiLength.foreach { _ => logger.info(f"Filtered out $filterUmisTooShort%,d reads that contained UMIs that were too short.") }

// Output the reads in the new ordering
logger.info("Assigning reads to UMIs and outputting.")
val outHeader = header.clone()
val tagFamilySizeCounter = new NumericCounter[Int]()
val outHeader = header.clone()
SamOrder.TemplateCoordinate.applyTo(outHeader)
val out = SamWriter(output, outHeader)
val out = SamWriter(output, outHeader)
val templateCoordinateIterator = {
// Skip sorting if the input is already template coordinate!
val iter = if (skipSorting) filteredIterator else {
logger.info("Sorting the input to TemplateCoordinate order.")
val sorter = Bams.sorter(SamOrder.TemplateCoordinate, header)
val sortProgress = ProgressLogger(logger, verb="Sorted")
filteredIterator.foreach { rec =>
sortProgress.record(rec)
sorter += rec
}
sortProgress.logLast()
sorter.iterator
}
// Note: this should never have to sort, just groups into templates
Bams.templateIterator(iter, out.header, Bams.MaxInMemory, Io.tmpDir)
}

val iterator = Bams.templateIterator(sorter.iterator, out.header, Bams.MaxInMemory, Io.tmpDir)
val tagFamilySizeCounter = new NumericCounter[Int]()
// If we sorted then log stats now, since the `sorter.iterator` above has consumed all the records fed to it,
// and so the stats should be ready
if (!skipSorting) logStats()

while (iterator.hasNext) {
// Output the reads in the new ordering
logger.info("Assigning reads to UMIs and outputting.")
while (templateCoordinateIterator.hasNext) {
// Take the next set of templates by position and assign UMIs
val templates = takeNextGroup(iterator)
val templates = takeNextGroup(templateCoordinateIterator, canTakeNextGroupByUmi=canTakeNextGroupByUmi)
assignUmiGroups(templates)

// Then output the records in the right order (assigned tag, read name, r1, r2)
val templatesByMi = templates.groupBy { t => t.r1.get[String](this.assignTag) }
val templatesByMi = templates.groupBy { t => t.r1.get.apply[String](this.assignTag) }

templatesByMi.keys.toSeq.sortBy(id => (id.length, id)).foreach(tag => {
templatesByMi(tag).sortBy(t => t.name).flatMap(t => t.primaryReads).foreach(rec => {
Expand All @@ -605,9 +623,11 @@ class GroupReadsByUmi
// Count up the family sizes
templatesByMi.values.foreach(ps => tagFamilySizeCounter.count(ps.size))
}

out.close()

// If we skipped sorting, log stats after we have consumed all the records
if (skipSorting) logStats()

// Write out the family size histogram
this.familySizeHistogram match {
case None => ()
Expand All @@ -620,8 +640,16 @@ class GroupReadsByUmi
}
}

private def logStats(): Unit = {
logger.info(f"Accepted $kept%,d reads for grouping.")
if (filteredNonPf > 0) logger.info(f"Filtered out $filteredNonPf%,d non-PF reads.")
logger.info(f"Filtered out $filteredPoorAlignment%,d reads due to mapping issues.")
logger.info(f"Filtered out $filteredNsInUmi%,d reads that contained one or more Ns in their UMIs.")
this.minUmiLength.foreach { _ => logger.info(f"Filtered out $filterUmisTooShort%,d reads that contained UMIs that were too short.") }
}

/** Consumes the next group of templates with all matching end positions and returns them. */
def takeNextGroup(iterator: BufferedIterator[Template]) : Seq[Template] = {
def takeNextGroup(iterator: BufferedIterator[Template], canTakeNextGroupByUmi: Boolean) : Seq[Template] = {
val first = iterator.next()
val firstEnds = ReadInfo(first)
val firstUmi = first.r1.get.apply[String](this.assignTag)
Expand Down Expand Up @@ -697,7 +725,7 @@ class GroupReadsByUmi

if (r1Lower) paired.lowerReadUmiPrefix + ":" + umis(0) + "-" + paired.higherReadUmiPrefix + ":" + umis(1)
else paired.higherReadUmiPrefix + ":" + umis(0) + "-" + paired.lowerReadUmiPrefix + ":" + umis(1)
case (_, _, paired: PairedUmiAssigner) =>
case (_, _, _: PairedUmiAssigner) =>
fail(s"Template ${t.name} has only one read, paired-reads required for paired strategy.")
case (Some(r1), _, _) =>
r1[String](this.rawTag)
Expand Down
66 changes: 37 additions & 29 deletions src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package com.fulcrumgenomics.umi

import com.fulcrumgenomics.bam.Template
import com.fulcrumgenomics.bam.api.SamOrder
import com.fulcrumgenomics.bam.api.SamOrder.TemplateCoordinate
import com.fulcrumgenomics.cmdline.FgBioMain.FailureException
import com.fulcrumgenomics.testing.SamBuilder.{Minus, Plus}
import com.fulcrumgenomics.testing.{SamBuilder, UnitSpec}
Expand Down Expand Up @@ -216,35 +217,42 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT

// Test for running the GroupReadsByUmi command line program with some sample input
"GroupReadsByUmi" should "group reads correctly" in {
val builder = new SamBuilder(readLength=100, sort=Some(SamOrder.Coordinate))
builder.addPair(name="a01", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a02", start1=100, start2=300, attrs=Map("RX" -> "AAAAgAAA"))
builder.addPair(name="a03", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a04", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="b01", start1=100, start2=100, unmapped2=true, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="c01", start1=100, start2=300, mapq1=5)

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30).execute()

val groups = readBamRecs(out).groupBy(_.name.charAt(0))

// Group A: 1-4 all passed through into one umi group
groups('a') should have size 4*2
groups('a').map(_.name).toSet shouldEqual Set("a01", "a02", "a03", "a04")
groups('a').map(r => r[String]("MI")).toSet should have size 1

// 5 separated out into another group due to unmapped mate
groups('b') should have size 2
groups('b').map(r => r[String]("MI")).toSet should have size 1
groups('b').map(r => r[String]("MI")).head should not be groups('a').map(r => r[String]("MI")).head

// 6 out for low mapq,
groups.contains('c') shouldBe false

hist.toFile.exists() shouldBe true
Seq(SamOrder.Coordinate, SamOrder.Queryname, SamOrder.TemplateCoordinate).foreach { sortOrder =>
val builder = new SamBuilder(readLength=100, sort=Some(sortOrder))
builder.addPair(name="a01", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a02", start1=100, start2=300, attrs=Map("RX" -> "AAAAgAAA"))
builder.addPair(name="a03", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a04", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="b01", start1=100, start2=100, unmapped2=true, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="c01", start1=100, start2=300, mapq1=5)

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30)
val logs = executeFgbioTool(tool)

val groups = readBamRecs(out).groupBy(_.name.charAt(0))

// Group A: 1-4 all passed through into one umi group
groups('a') should have size 4*2
groups('a').map(_.name).toSet shouldEqual Set("a01", "a02", "a03", "a04")
groups('a').map(r => r[String]("MI")).toSet should have size 1

// 5 separated out into another group due to unmapped mate
groups('b') should have size 2
groups('b').map(r => r[String]("MI")).toSet should have size 1
groups('b').map(r => r[String]("MI")).head should not be groups('a').map(r => r[String]("MI")).head

// 6 out for low mapq,
groups.contains('c') shouldBe false

hist.toFile.exists() shouldBe true

// Make sure that we skip sorting for TemplateCoordinate
val sortMessage = "Sorting the input to TemplateCoordinate order"
logs.exists(_.contains(sortMessage)) shouldBe (sortOrder != TemplateCoordinate)
}
}

it should "correctly group reads with the paired assigner when the two UMIs are the same" in {
Expand Down

0 comments on commit 86e8980

Please sign in to comment.