Skip to content

Commit

Permalink
Tool to downsample a BAM while retaining reads in low coverage areas. (
Browse files Browse the repository at this point in the history
…#893)

* Tool to downsample a BAM while retaining reads in low coverage areas.
  • Loading branch information
tfenne authored Jan 5, 2023
1 parent f72a0c3 commit 53c2ae9
Show file tree
Hide file tree
Showing 4 changed files with 344 additions and 1 deletion.
35 changes: 34 additions & 1 deletion src/main/scala/com/fulcrumgenomics/bam/Bams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder}
import htsjdk.samtools.SamPairUtil.PairOrientation
import htsjdk.samtools._
import htsjdk.samtools.reference.ReferenceSequence
import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil}
import htsjdk.samtools.util.{CloserUtil, CoordMath, Murmur3, SequenceUtil}

import java.io.Closeable
import scala.math.{max, min}
Expand Down Expand Up @@ -366,6 +366,39 @@ object Bams extends LazyLogging {
new SelfClosingIterator(_iterator, () => queryIterator.close())
}

/** Returns an iterator over records, grouped into [[Template]] objects, in a random order. Reads are first
* sorted based on a hash of their read name in order to randomize the order while collating by read name.
* The randomized reads are then iterated and grouped into [[Template]] objects.
*
* @param in the [[SamSource]] from which to pull reads and the SAM header
* @param randomSeed a seed used to generate the hashes that drive the random sorting. Different seeds will produce
* different randomizations of the reads. Using the same seed will result in the same ordering
* of the data across invocations.
* @param maxInMemory the maximum number of records to keep and sort in memory, if sorting is needed
* @param tmpDir a temp directory to use for temporary sorting files if sorting is needed
*/
def templateRandomIterator(in: SamSource,
randomSeed: Int = 42,
maxInMemory: Int = Bams.MaxInMemory,
tmpDir: DirPath = Io.tmpDir): Iterator[Template] = {
val hasher = new Murmur3(randomSeed)
val sorter = {
val f = (r: SamRecord) => SamOrder.RandomQueryKey(hasher.hashUnencodedChars(r.name), r.name, r.asSam.getFlags)
new Sorter(maxInMemory, new SamRecordCodec(in.header), f)
}

val sortProgress = ProgressLogger(logger, verb = "sorted", unit = 5e6.toInt)
in.foreach { rec =>
sorter += rec
sortProgress.record()
}

val header = in.header.clone()
SamOrder.RandomQuery.applyTo(header)
Bams.templateIterator(sorter.iterator, header, maxInMemory, tmpDir)
}


/** Returns an iterator over the records in the given iterator such that the order of the records returned is
* determined by the value of the given SAM tag, which can optionally be transformed.
*
Expand Down
186 changes: 186 additions & 0 deletions src/main/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBam.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* The MIT License
*
* Copyright (c) 2023 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.fulcrumgenomics.bam

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.DownsampleAndNormalizeBam.CoverageTracker
import com.fulcrumgenomics.bam.api._
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter}
import htsjdk.samtools.util.IntervalList.IntervalMergerIterator
import htsjdk.samtools.util.{CoordMath, Interval, IntervalList, Murmur3, OverlapDetector}

/**
* Companion object with helper classes.
*/
private object DownsampleAndNormalizeBam extends LazyLogging {
/**
* Tracks coverage over a set of regions.
*/
class CoverageTracker(regions: Seq[Interval],
val targetCoverage: Int,
val minMapQ: Int
) {

// Overlap detector for region => coverage
private val detector = new OverlapDetector[RegionCoverage](0, 0)
regions.foreach { region =>
detector.addLhs(new RegionCoverage(region.getContig, region.getStart, region.getEnd), region)
}

/**
* Checks to see if a template has reads that contribute coverage to any bases that are currently
* below the coverage threshold. If so, coverage from all acceptable reads is accumulated and the
* method returns true. Otherwise no coverage is accumulated and the method returns false.
*
* @param t a Template to accumulate coverage from if it covers any bases that are under the coverage threshold
* @return true if the template added coverage, false otherwise
*/
def add(t: Template): Boolean = {
val recs = t.allReads
.filterNot(_.secondary)
.filterNot(_.unmapped)
.filterNot(_.duplicate)
.filter(_.mapq >= minMapQ)
.toSeq

val addsCoverage = recs.exists { rec =>
detector.getOverlaps(rec.asSam).iterator().exists { cov =>
Range.inclusive(math.max(rec.start, cov.start), math.min(rec.end, cov.end)).exists { pos =>
cov(pos) < targetCoverage
}
}
}

if (addsCoverage) {
val intervals = {
val iter = recs
.sortBy(r => (r.refName, r.start, r.end))
.iterator
.map(r => new Interval(r.refName, r.start, r.end))
new IntervalMergerIterator(iter, true, false, false)
}

for (interval <- intervals; cov <- detector.getOverlaps(interval).iterator()) {
cov.add(interval.getStart, interval.getEnd)
}
}

addsCoverage
}
}

/**
* Coverage counter for a region - all coordinates in the public API are 1-based closed ended.
*/
class RegionCoverage(val chrom: String, val start: Int, val end: Int) {
private val offset = start
private val counts = new Array[Int](CoordMath.getLength(start, end))

/** Returns the coverage at the given genomic position, or -1 if the position is not between start:end. */
def apply(i: Int): Int = {
if (i < start || i > end) {
throw new IndexOutOfBoundsException(s"Position $i is outside of range for region $chrom:$start-$end.")
}

this.counts(i - offset)
}

/** Adds 1 to the coverage counts for all bases between from:to inclusive. */
def add(from: Int, to: Int): Unit = {
forloop (from=math.max(0, from-offset), until=math.min(counts.length, to-offset+1)) { i =>
this.counts(i) += 1
}
}
}
}

@clp(
group=ClpGroups.SamOrBam,
description =
"""
|Downsamples a BAM in a biased way to a uniform coverage across regions.
|
|Attempts to downsample a BAM such that every base in the genome (or in the target `regions` if provided)
|is covered by at least `coverage` reads. When computing coverage:
| - Reads marked as secondary, duplicate or unmapped are not used
| - A base can receive coverage from only one read with the same queryname (i.e. mate overlaps are not counted)
| - Coverage is counted if a read _spans_ a base, even if that base is deleted in the read
|
|Reads are first sorted into a random order (by hashing read names). Reads are then consumed one template
|at a time, and if any read adds coverage to base that is under the target coverage, _all_ reads (including
|secondary, unmapped, etc.) for that template are emitted into the output.
|
|Given the procedure used for downsampling, it is likely the output BAM will have coverage up to 2X the requested
|coverage at regions in the input BAM that are i) well covered and ii) are close to regions that are poorly
|covered.
"""
)
class DownsampleAndNormalizeBam
( @arg(flag='i', doc="Input SAM or BAM file.") val input: PathToBam,
@arg(flag='o', doc="Output SAM or BAM file.") val output: PathToBam,
@arg(flag='c', doc="Desired minimum coverage.") val coverage: Int,
@arg(flag='m', doc="Minimum mapping quality to count a read as covering.") val minMapQ: Int = 0,
@arg(flag='s', doc="Random seed to use when randomizing order of reads/templates.") val seed: Int = 42,
@arg(flag='l', doc="Optional set of regions for coverage targeting.") val regions: Option[PathToIntervals] = None,
@arg(flag='M', doc="Maximum records to be held in memory while sorting.") val maxInMemory: Int = Bams.MaxInMemory
) extends FgBioTool {

Io.assertReadable(input)
Io.assertCanWriteFile(output)
regions.foreach(Io.assertReadable)

override def execute(): Unit = {
val in = SamSource(input)
val out = SamWriter(output, in.header, sort=SamOrder(in.header))

val targets: IndexedSeq[Interval] = regions match {
case Some(rs) => IntervalList.fromPath(rs).uniqued().getIntervals.toIndexedSeq
case None => in.dict.map(s => new Interval(s.name, 1, s.length)).toIndexedSeq
}

val tracker = new CoverageTracker(targets, this.coverage, this.minMapQ)
var (read, written) = (0L, 0L)

Bams.templateRandomIterator(in, this.seed, maxInMemory).foreach { template =>
read += 1

if (tracker.add(template)) {
out ++= template.allReads
written += 1
}

if (read % 1000000 == 0) {
logger.info(f"Read ${read}%,d templates and wrote out ${written}%,d")
}
}

in.safelyClose()
out.close()
logger.info(f"Read ${read}%,d templates and wrote out ${written}%,d")
}
}
20 changes: 20 additions & 0 deletions src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ class BamsTest extends UnitSpec {
}
}

"Bams.templateRandomIterator" should "return template objects in a random order" in {
val builder = new SamBuilder(sort = Some(SamOrder.Coordinate))
builder.addPair(name = "p1", contig=0, start1 = 100, start2 = 500)
builder.addPair(name = "p2", contig=0, start1 = 200, start2 = 600)
builder.addPair(name = "p3", contig=0, start1 = 300, start2 = 700)
builder.addPair(name = "p4", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p5", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p6", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p7", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p8", contig=0, start1 = 400, start2 = 800)

val ts1 = Bams.templateRandomIterator(builder.toSource, randomSeed=1).toIndexedSeq
val ts2 = Bams.templateRandomIterator(builder.toSource, randomSeed=12345).toIndexedSeq

ts1 should have size 8
ts2 should have size 8
ts1.map(_.name).sorted shouldBe ts2.map(_.name).sorted
ts1.map(_.name) should not be ts2.map(_.name)
}

"Bams.regenerateNmUqMdTags" should "null out fields on unmapped reads" in {
val builder = new SamBuilder(sort=Some(SamOrder.Coordinate))
val rec = builder.addFrag(unmapped=true).get
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* The MIT License
*
* Copyright (c) 2023 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.fulcrumgenomics.bam

import com.fulcrumgenomics.bam.api.{SamOrder, SamSource}
import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.pileup.PileupBuilder
import com.fulcrumgenomics.fasta.{SequenceDictionary, SequenceMetadata}
import com.fulcrumgenomics.testing.{SamBuilder, UnitSpec}

class DownsampleAndNormalizeBamTest extends UnitSpec {
def newBuilder(readLength: Int = 50): SamBuilder = {
/** Makes a new builder with two short chromosomes. */
new SamBuilder(
readLength = readLength,
sort = Some(SamOrder.Coordinate),
sd = Some(SequenceDictionary(
SequenceMetadata(name="chr1", length=1000, index=0),
SequenceMetadata(name="chr2", length=1000, index=0),
))
)
}

/** Generates a temp file and returns its path. */
def newBam(): PathToBam = makeTempFile("downsampled.", ".bam")


"DownsampleAndNormalizeBam" should "run ok on an empty bam" in {
val empty = newBuilder().toTempFile()
val out = newBam()
new DownsampleAndNormalizeBam(input=empty, output=out, coverage=100).execute()
val recs = SamSource(out).toIndexedSeq
recs.isEmpty shouldBe true
}

it should "downsample a BAM with excess coverage" in {
val builder = newBuilder()
val out = newBam()
val coverage = 30

// Add lots of reads - 25 read pairs starting at each position until we run out of room
builder.dict.foreach { chrom =>
Range.inclusive(1, chrom.length - 125).foreach { start =>
forloop (from=0, until=5) { _ =>
builder.addPair(contig=chrom.index, start1=start , start2=start+75)
}
}
}

new DownsampleAndNormalizeBam(input = builder.toTempFile(), output = out, coverage = coverage).execute()

val pileIn = PileupBuilder(source=builder.toSource, accessPattern=PileupBuilder.BamAccessPattern.Streaming)
val pileOut = PileupBuilder(source=SamSource(out), accessPattern=PileupBuilder.BamAccessPattern.Streaming)
var totalIn = 0L
var totalOut = 0L

builder.dict.foreach { chrom =>
Range.inclusive(1, chrom.length).foreach { pos =>
val covIn = pileIn.pileup(chrom.name, pos).withoutOverlaps.depth
val covOut = pileOut.pileup(chrom.name, pos).withoutOverlaps.depth
totalIn += covIn
totalOut += covOut


covOut <= covIn shouldBe true
if (covIn < coverage) {
covOut shouldBe covIn
}
else {
covOut should be >= coverage
}
}
}

val meanIn = totalIn / builder.dict.sumBy(_.length).toDouble
val meanOut = totalOut / builder.dict.sumBy(_.length).toDouble

meanOut should be < meanIn
meanOut should be < (coverage * 2).toDouble
meanOut should be > coverage.toDouble
}
}

0 comments on commit 53c2ae9

Please sign in to comment.