Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroupReadsByUmi only sort input if it is not TemplateCoordinate sorted #794

Merged
merged 1 commit into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 73 additions & 45 deletions src/main/scala/com/fulcrumgenomics/umi/GroupReadsByUmi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
package com.fulcrumgenomics.umi

import java.util.concurrent.atomic.AtomicLong

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.api.SamOrder.TemplateCoordinate
import com.fulcrumgenomics.bam.{Bams, Template}
import com.fulcrumgenomics.bam.api.{SamOrder, SamRecord, SamSource, SamWriter}
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
Expand Down Expand Up @@ -102,8 +102,8 @@ object GroupReadsByUmi {

val r1Earlier =
(ref1 < ref2) ||
(ref1 == ref2 && start1 < start2) ||
(ref1 == ref2 && start1 == start2 && strand1 < strand2)
(ref1 == ref2 && start1 < start2) ||
(ref1 == ref2 && start1 == start2 && strand1 < strand2)

if (r1Earlier) new ReadInfo(ref1, start1, strand1, ref2, start2, strand2, lib)
else new ReadInfo(ref2, start2, strand2, ref1, start1, strand1, lib)
Expand Down Expand Up @@ -381,7 +381,7 @@ object GroupReadsByUmi {
* the main assignment method to filter out the ones that shouldn't be in the Map.
*/
override def assign(rawUmis: Seq[Umi]): Map[Umi, MoleculeId] = {
super.assign(rawUmis).filter { case (umi, id) => rawUmis.contains(umi) }
super.assign(rawUmis).filter { case (umi, _) => rawUmis.contains(umi) }
}
}
}
Expand Down Expand Up @@ -469,6 +469,9 @@ object Strategy extends FgBioEnum[Strategy] {
|compared, where `len` is the length of the shortest UMI. The UMI length is the number of [ACGT] bases in the UMI
|(i.e. does not count dashes and other non-ACGT characters). This option is not implemented for reads with UMI pairs
|(i.e. using the paired assigner).
|
|If the input is not template-coordinate sorted (i.e. `SO:unsorted GO:query SS:unsorted:template-coordinate`), then
|this tool will re-sort the input. The ouitput will be written in template-coordinate order.
"""
)
class GroupReadsByUmi
Expand Down Expand Up @@ -496,16 +499,6 @@ class GroupReadsByUmi

private val assigner = strategy.newStrategy(this.edits)

/** True if no differences in UMIs are tolerated and the Molecular ID tag is MI, false otherwise. True here enables
* an optimization where, when bringing groups of reads into memory, we can _also_ group by UMI thus
* reducing the number of reads in memory. This is helpful since edits=0 is often used for data that has
* high numbers of reads with the same start/stop coordinates.
* We do this by setting the MI tag to the canonicalized (optionally truncated) UMI prior to sorting, so that
* reads with the same UMI are grouped together in the sorted stream of records.
*/
private val canTakeNextGroupByUmi =
(this.assignTag == ConsensusTags.MolecularId) &&
(this.edits == 0 || this.strategy == Strategy.Identity)

/** Checks that the read's mapq is over a minimum, and if the read is paired, that the mate mapq is also over the min. */
private def mapqOk(rec: SamRecord, minMapQ: Int): Boolean = {
Expand All @@ -523,22 +516,36 @@ class GroupReadsByUmi
}
}

// A handful of counters for tracking reads
nh13 marked this conversation as resolved.
Show resolved Hide resolved
private var (filteredNonPf, filteredPoorAlignment, filteredNsInUmi, filterUmisTooShort, kept) = (0L, 0L, 0L, 0L, 0L)

override def execute(): Unit = {
Io.assertReadable(input)
Io.assertCanWriteFile(output)
this.familySizeHistogram.foreach(f => Io.assertCanWriteFile(f))

val in = SamSource(input)
val header = in.header
val sorter = Bams.sorter(SamOrder.TemplateCoordinate, header)
val sortProgress = ProgressLogger(logger, verb="Sorted")

// A handful of counters for tracking reads
var (filteredNonPf, filteredPoorAlignment, filteredNsInUmi, filterUmisTooShort, kept) = (0L, 0L, 0L, 0L, 0L)
val in = SamSource(input)
val header = in.header
val skipSorting = SamOrder(header).contains(TemplateCoordinate)

// True if the input will be sorted and no differences in UMIs are tolerated and the Molecular ID tag is MI,
// false otherwise. Pre-sorted (TemplateCoordinate sorted) input does not enable this optimization as we rely on
// copying the `RX` tag into the `MI` tag so that same raw UMIs are grouped together in the sorted stream of records.
//
// True here enables an optimization where, when bringing groups of reads into memory, we can _also_ group by UMI
// thus reducing the number of reads in memory. This is helpful since edits=0 is often used for data that has
// high numbers of reads with the same start/stop coordinates.
// We do this by setting the MI tag to the canonicalized (optionally truncated) UMI prior to sorting, so that
// reads with the same UMI are grouped together in the sorted stream of records.
val canTakeNextGroupByUmi = {
!skipSorting &&
(this.assignTag == ConsensusTags.MolecularId) &&
(this.edits == 0 || this.strategy == Strategy.Identity)
}

// Filter and sort the input BAM file
logger.info("Filtering and sorting input.")
in.iterator
logger.info("Filtering the input.")
val filteredIterator = in.iterator
.filter(r => !r.secondary && !r.supplementary)
.filter(r => (includeNonPfReads || r.pf) || { filteredNonPf += 1; false })
.filter(r => (r.mapped || (r.paired && r.mateMapped)) || { filteredPoorAlignment += 1; false })
Expand All @@ -552,13 +559,13 @@ class GroupReadsByUmi
}
} || { filterUmisTooShort += 1; false}
}
.foreach { r =>
.tapEach { r =>
// If we're able to also group by the UMI because edits aren't allowed, push the trimmed, canonicalized UMI
// into the assign tag (which must be MI if canTakeNextGroupByUmi is true), since that is used by the
// SamOrder to sort the reads _and_ we'll overwrite it on the way out!
// Note that here we trim UMIs (if enabled) to the minimum UMI length for sorting, but that when doing the
// actual grouping later we go back to the raw tag (RX) and use as much of the UMI as possible.
if (this.canTakeNextGroupByUmi) {
if (canTakeNextGroupByUmi) {
val umi = this.assigner.canonicalize(r[String](rawTag).toUpperCase)
val truncated = this.minUmiLength match {
case None => umi
Expand All @@ -568,33 +575,44 @@ class GroupReadsByUmi
r(this.assignTag) = truncated
}

sorter += r
kept += 1
sortProgress.record(r)
r
}

logger.info(f"Accepted $kept%,d reads for grouping.")
if (filteredNonPf > 0) logger.info(f"Filtered out $filteredNonPf%,d non-PF reads.")
logger.info(f"Filtered out $filteredPoorAlignment%,d reads due to mapping issues.")
logger.info(f"Filtered out $filteredNsInUmi%,d reads that contained one or more Ns in their UMIs.")
this.minUmiLength.foreach { _ => logger.info(f"Filtered out $filterUmisTooShort%,d reads that contained UMIs that were too short.") }

// Output the reads in the new ordering
logger.info("Assigning reads to UMIs and outputting.")
val outHeader = header.clone()
val tagFamilySizeCounter = new NumericCounter[Int]()
val outHeader = header.clone()
SamOrder.TemplateCoordinate.applyTo(outHeader)
val out = SamWriter(output, outHeader)
val out = SamWriter(output, outHeader)
val templateCoordinateIterator = {
// Skip sorting if the input is already template coordinate!
val iter = if (skipSorting) filteredIterator else {
logger.info("Sorting the input to TemplateCoordinate order.")
val sorter = Bams.sorter(SamOrder.TemplateCoordinate, header)
val sortProgress = ProgressLogger(logger, verb="Sorted")
filteredIterator.foreach { rec =>
sortProgress.record(rec)
sorter += rec
}
sortProgress.logLast()
sorter.iterator
}
// Note: this should never have to sort, just groups into templates
Bams.templateIterator(iter, out.header, Bams.MaxInMemory, Io.tmpDir)
}

val iterator = Bams.templateIterator(sorter.iterator, out.header, Bams.MaxInMemory, Io.tmpDir)
val tagFamilySizeCounter = new NumericCounter[Int]()
// If we sorted then log stats now, since the `sorter.iterator` above has consumed all the records fed to it,
// and so the stats should be ready
if (!skipSorting) logStats()

while (iterator.hasNext) {
// Output the reads in the new ordering
logger.info("Assigning reads to UMIs and outputting.")
while (templateCoordinateIterator.hasNext) {
// Take the next set of templates by position and assign UMIs
val templates = takeNextGroup(iterator)
val templates = takeNextGroup(templateCoordinateIterator, canTakeNextGroupByUmi=canTakeNextGroupByUmi)
assignUmiGroups(templates)

// Then output the records in the right order (assigned tag, read name, r1, r2)
val templatesByMi = templates.groupBy { t => t.r1.get[String](this.assignTag) }
val templatesByMi = templates.groupBy { t => t.r1.get.apply[String](this.assignTag) }

templatesByMi.keys.toSeq.sortBy(id => (id.length, id)).foreach(tag => {
templatesByMi(tag).sortBy(t => t.name).flatMap(t => t.primaryReads).foreach(rec => {
Expand All @@ -605,9 +623,11 @@ class GroupReadsByUmi
// Count up the family sizes
templatesByMi.values.foreach(ps => tagFamilySizeCounter.count(ps.size))
}

out.close()

// If we skipped sorting, log stats after we have consumed all the records
if (skipSorting) logStats()

// Write out the family size histogram
this.familySizeHistogram match {
case None => ()
Expand All @@ -620,8 +640,16 @@ class GroupReadsByUmi
}
}

private def logStats(): Unit = {
logger.info(f"Accepted $kept%,d reads for grouping.")
if (filteredNonPf > 0) logger.info(f"Filtered out $filteredNonPf%,d non-PF reads.")
logger.info(f"Filtered out $filteredPoorAlignment%,d reads due to mapping issues.")
logger.info(f"Filtered out $filteredNsInUmi%,d reads that contained one or more Ns in their UMIs.")
this.minUmiLength.foreach { _ => logger.info(f"Filtered out $filterUmisTooShort%,d reads that contained UMIs that were too short.") }
}

/** Consumes the next group of templates with all matching end positions and returns them. */
def takeNextGroup(iterator: BufferedIterator[Template]) : Seq[Template] = {
def takeNextGroup(iterator: BufferedIterator[Template], canTakeNextGroupByUmi: Boolean) : Seq[Template] = {
val first = iterator.next()
val firstEnds = ReadInfo(first)
val firstUmi = first.r1.get.apply[String](this.assignTag)
Expand Down Expand Up @@ -697,7 +725,7 @@ class GroupReadsByUmi

if (r1Lower) paired.lowerReadUmiPrefix + ":" + umis(0) + "-" + paired.higherReadUmiPrefix + ":" + umis(1)
else paired.higherReadUmiPrefix + ":" + umis(0) + "-" + paired.lowerReadUmiPrefix + ":" + umis(1)
case (_, _, paired: PairedUmiAssigner) =>
case (_, _, _: PairedUmiAssigner) =>
fail(s"Template ${t.name} has only one read, paired-reads required for paired strategy.")
case (Some(r1), _, _) =>
r1[String](this.rawTag)
Expand Down
66 changes: 37 additions & 29 deletions src/test/scala/com/fulcrumgenomics/umi/GroupReadsByUmiTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package com.fulcrumgenomics.umi

import com.fulcrumgenomics.bam.Template
import com.fulcrumgenomics.bam.api.SamOrder
import com.fulcrumgenomics.bam.api.SamOrder.TemplateCoordinate
import com.fulcrumgenomics.cmdline.FgBioMain.FailureException
import com.fulcrumgenomics.testing.SamBuilder.{Minus, Plus}
import com.fulcrumgenomics.testing.{SamBuilder, UnitSpec}
Expand Down Expand Up @@ -216,35 +217,42 @@ class GroupReadsByUmiTest extends UnitSpec with OptionValues with PrivateMethodT

// Test for running the GroupReadsByUmi command line program with some sample input
"GroupReadsByUmi" should "group reads correctly" in {
val builder = new SamBuilder(readLength=100, sort=Some(SamOrder.Coordinate))
builder.addPair(name="a01", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a02", start1=100, start2=300, attrs=Map("RX" -> "AAAAgAAA"))
builder.addPair(name="a03", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a04", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="b01", start1=100, start2=100, unmapped2=true, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="c01", start1=100, start2=300, mapq1=5)

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30).execute()

val groups = readBamRecs(out).groupBy(_.name.charAt(0))

// Group A: 1-4 all passed through into one umi group
groups('a') should have size 4*2
groups('a').map(_.name).toSet shouldEqual Set("a01", "a02", "a03", "a04")
groups('a').map(r => r[String]("MI")).toSet should have size 1

// 5 separated out into another group due to unmapped mate
groups('b') should have size 2
groups('b').map(r => r[String]("MI")).toSet should have size 1
groups('b').map(r => r[String]("MI")).head should not be groups('a').map(r => r[String]("MI")).head

// 6 out for low mapq,
groups.contains('c') shouldBe false

hist.toFile.exists() shouldBe true
Seq(SamOrder.Coordinate, SamOrder.Queryname, SamOrder.TemplateCoordinate).foreach { sortOrder =>
val builder = new SamBuilder(readLength=100, sort=Some(sortOrder))
builder.addPair(name="a01", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a02", start1=100, start2=300, attrs=Map("RX" -> "AAAAgAAA"))
builder.addPair(name="a03", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAA"))
builder.addPair(name="a04", start1=100, start2=300, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="b01", start1=100, start2=100, unmapped2=true, attrs=Map("RX" -> "AAAAAAAt"))
builder.addPair(name="c01", start1=100, start2=300, mapq1=5)

val in = builder.toTempFile()
val out = Files.createTempFile("umi_grouped.", ".sam")
val hist = Files.createTempFile("umi_grouped.", ".histogram.txt")
val tool = new GroupReadsByUmi(input=in, output=out, familySizeHistogram=Some(hist), rawTag="RX", assignTag="MI", strategy=Strategy.Edit, edits=1, minMapQ=30)
val logs = executeFgbioTool(tool)

val groups = readBamRecs(out).groupBy(_.name.charAt(0))

// Group A: 1-4 all passed through into one umi group
groups('a') should have size 4*2
groups('a').map(_.name).toSet shouldEqual Set("a01", "a02", "a03", "a04")
groups('a').map(r => r[String]("MI")).toSet should have size 1

// 5 separated out into another group due to unmapped mate
groups('b') should have size 2
groups('b').map(r => r[String]("MI")).toSet should have size 1
groups('b').map(r => r[String]("MI")).head should not be groups('a').map(r => r[String]("MI")).head

// 6 out for low mapq,
groups.contains('c') shouldBe false

hist.toFile.exists() shouldBe true

// Make sure that we skip sorting for TemplateCoordinate
val sortMessage = "Sorting the input to TemplateCoordinate order"
logs.exists(_.contains(sortMessage)) shouldBe (sortOrder != TemplateCoordinate)
}
}

it should "correctly group reads with the paired assigner when the two UMIs are the same" in {
Expand Down