Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tool to downsample a BAM while retaining reads in low coverage areas. #893

Merged
merged 3 commits into from
Jan 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion src/main/scala/com/fulcrumgenomics/bam/Bams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder}
import htsjdk.samtools.SamPairUtil.PairOrientation
import htsjdk.samtools._
import htsjdk.samtools.reference.ReferenceSequence
import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil}
import htsjdk.samtools.util.{CloserUtil, CoordMath, Murmur3, SequenceUtil}

import java.io.Closeable
import scala.math.{max, min}
Expand Down Expand Up @@ -366,6 +366,39 @@ object Bams extends LazyLogging {
new SelfClosingIterator(_iterator, () => queryIterator.close())
}

/** Returns an iterator over records, grouped into [[Template]] objects, in a random order. Reads are first
* sorted based on a hash of their read name in order to randomize the order while collating by read name.
* The randomized reads are then iterated and grouped into [[Template]] objects.
*
* @param in the [[SamSource]] from which to pull reads and the SAM header
* @param randomSeed a seed used to generate the hashes that drive the random sorting. Different seeds will produce
* different randomizations of the reads. Using the same seed will result in the same ordering
* of the data across invocations.
* @param maxInMemory the maximum number of records to keep and sort in memory, if sorting is needed
* @param tmpDir a temp directory to use for temporary sorting files if sorting is needed
*/
def templateRandomIterator(in: SamSource,
randomSeed: Int = 42,
maxInMemory: Int = Bams.MaxInMemory,
tmpDir: DirPath = Io.tmpDir): Iterator[Template] = {
val hasher = new Murmur3(randomSeed)
val sorter = {
val f = (r: SamRecord) => SamOrder.RandomQueryKey(hasher.hashUnencodedChars(r.name), r.name, r.asSam.getFlags)
new Sorter(maxInMemory, new SamRecordCodec(in.header), f)
}

val sortProgress = ProgressLogger(logger, verb = "sorted", unit = 5e6.toInt)
in.foreach { rec =>
sorter += rec
sortProgress.record()
}

val header = in.header.clone()
SamOrder.RandomQuery.applyTo(header)
Bams.templateIterator(sorter.iterator, header, maxInMemory, tmpDir)
}


/** Returns an iterator over the records in the given iterator such that the order of the records returned is
* determined by the value of the given SAM tag, which can optionally be transformed.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* The MIT License
*
* Copyright (c) 2023 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.fulcrumgenomics.bam

import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.DownsampleAndNormalizeBam.CoverageTracker
import com.fulcrumgenomics.bam.api._
import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool}
import com.fulcrumgenomics.commons.util.LazyLogging
import com.fulcrumgenomics.sopt.{arg, clp}
import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter}
import htsjdk.samtools.util.IntervalList.IntervalMergerIterator
import htsjdk.samtools.util.{CoordMath, Interval, IntervalList, Murmur3, OverlapDetector}

/**
* Companion object with helper classes.
*/
private object DownsampleAndNormalizeBam extends LazyLogging {
/**
* Tracks coverage over a set of regions.
*/
class CoverageTracker(regions: Seq[Interval],
val targetCoverage: Int,
val minMapQ: Int
) {

// Overlap detector for region => coverage
private val detector = new OverlapDetector[RegionCoverage](0, 0)
regions.foreach { region =>
detector.addLhs(new RegionCoverage(region.getContig, region.getStart, region.getEnd), region)
}

/**
* Checks to see if a template has reads that contribute coverage to any bases that are currently
* below the coverage threshold. If so, coverage from all acceptable reads is accumulated and the
* method returns true. Otherwise no coverage is accumulated and the method returns false.
*
* @param t a Template to accumulate coverage from if it covers any bases that are under the coverage threshold
* @return true if the template added coverage, false otherwise
*/
def add(t: Template): Boolean = {
val recs = t.allReads
.filterNot(_.secondary)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made me think about supplementary reads. Should those be upgraded to "primary" alignments or be filtered out as well? I'm thinking of a region where we have a large number of both primary and supplementary alignments, and how to pick there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely want to include supplementary reads, they're just part of a split-read alignment and therefore contribute to coverage. E.g. imagine an alignment to a circular genome where anything that maps over the break point will have a primary+supplementary, we want to count both of those.

.filterNot(_.unmapped)
.filterNot(_.duplicate)
.filter(_.mapq >= minMapQ)
.toSeq

val addsCoverage = recs.exists { rec =>
detector.getOverlaps(rec.asSam).iterator().exists { cov =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's time to have getOverlaps return an iterator itself?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think perhaps it's time to move on from HTSJDK's OverlapDetector and implement one in scala that suits our needs better :/ But not today.

Range.inclusive(math.max(rec.start, cov.start), math.min(rec.end, cov.end)).exists { pos =>
cov(pos) < targetCoverage
}
}
}

if (addsCoverage) {
val intervals = {
val iter = recs
.sortBy(r => (r.refName, r.start, r.end))
.iterator
.map(r => new Interval(r.refName, r.start, r.end))
new IntervalMergerIterator(iter, true, false, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sad we can't call arguments by name, since what are the three booleans here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. For reference they are:

  • combineAbutting
  • enforceSameStrand
  • concatenateNames

}

for (interval <- intervals; cov <- detector.getOverlaps(interval).iterator()) {
cov.add(interval.getStart, interval.getEnd)
}
}

addsCoverage
}
}

/**
* Coverage counter for a region - all coordinates in the public API are 1-based closed ended.
*/
class RegionCoverage(val chrom: String, val start: Int, val end: Int) {
private val offset = start
private val counts = new Array[Int](CoordMath.getLength(start, end))

/** Returns the coverage at the given genomic position, or -1 if the position is not between start:end. */
def apply(i: Int): Int = {
if (i < start || i > end) {
throw new IndexOutOfBoundsException(s"Position $i is outside of range for region $chrom:$start-$end.")
}

this.counts(i - offset)
}

/** Adds 1 to the coverage counts for all bases between from:to inclusive. */
def add(from: Int, to: Int): Unit = {
forloop (from=math.max(0, from-offset), until=math.min(counts.length, to-offset+1)) { i =>
this.counts(i) += 1
}
}
}
}

@clp(
group=ClpGroups.SamOrBam,
description =
"""
|Downsamples a BAM in a biased way to a uniform coverage across regions.
|
|Attempts to downsample a BAM such that every base in the genome (or in the target `regions` if provided)
|is covered by at least `coverage` reads. When computing coverage:
| - Reads marked as secondary, duplicate or unmapped are not used
| - A base can receive coverage from only one read with the same queryname (i.e. mate overlaps are not counted)
| - Coverage is counted if a read _spans_ a base, even if that base is deleted in the read
|
|Reads are first sorted into a random order (by hashing read names). Reads are then consumed one template
|at a time, and if any read adds coverage to base that is under the target coverage, _all_ reads (including
|secondary, unmapped, etc.) for that template are emitted into the output.
|
|Given the procedure used for downsampling, it is likely the output BAM will have coverage up to 2X the requested
|coverage at regions in the input BAM that are i) well covered and ii) are close to regions that are poorly
|covered.
"""
)
class DownsampleAndNormalizeBam
( @arg(flag='i', doc="Input SAM or BAM file.") val input: PathToBam,
@arg(flag='o', doc="Output SAM or BAM file.") val output: PathToBam,
@arg(flag='c', doc="Desired minimum coverage.") val coverage: Int,
@arg(flag='m', doc="Minimum mapping quality to count a read as covering.") val minMapQ: Int = 0,
@arg(flag='s', doc="Random seed to use when randomizing order of reads/templates.") val seed: Int = 42,
@arg(flag='l', doc="Optional set of regions for coverage targeting.") val regions: Option[PathToIntervals] = None,
@arg(flag='M', doc="Maximum records to be held in memory while sorting.") val maxInMemory: Int = Bams.MaxInMemory
) extends FgBioTool {

Io.assertReadable(input)
Io.assertCanWriteFile(output)
regions.foreach(Io.assertReadable)

override def execute(): Unit = {
val in = SamSource(input)
val out = SamWriter(output, in.header, sort=SamOrder(in.header))

val targets: IndexedSeq[Interval] = regions match {
case Some(rs) => IntervalList.fromPath(rs).uniqued().getIntervals.toIndexedSeq
case None => in.dict.map(s => new Interval(s.name, 1, s.length)).toIndexedSeq
}

val tracker = new CoverageTracker(targets, this.coverage, this.minMapQ)
var (read, written) = (0L, 0L)

Bams.templateRandomIterator(in, this.seed, maxInMemory).foreach { template =>
read += 1

if (tracker.add(template)) {
out ++= template.allReads
written += 1
}

if (read % 1000000 == 0) {
logger.info(f"Read ${read}%,d templates and wrote out ${written}%,d")
}
}

in.safelyClose()
out.close()
logger.info(f"Read ${read}%,d templates and wrote out ${written}%,d")
}
}
20 changes: 20 additions & 0 deletions src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,26 @@ class BamsTest extends UnitSpec {
}
}

"Bams.templateRandomIterator" should "return template objects in a random order" in {
val builder = new SamBuilder(sort = Some(SamOrder.Coordinate))
builder.addPair(name = "p1", contig=0, start1 = 100, start2 = 500)
builder.addPair(name = "p2", contig=0, start1 = 200, start2 = 600)
builder.addPair(name = "p3", contig=0, start1 = 300, start2 = 700)
builder.addPair(name = "p4", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p5", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p6", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p7", contig=0, start1 = 400, start2 = 800)
builder.addPair(name = "p8", contig=0, start1 = 400, start2 = 800)

val ts1 = Bams.templateRandomIterator(builder.toSource, randomSeed=1).toIndexedSeq
val ts2 = Bams.templateRandomIterator(builder.toSource, randomSeed=12345).toIndexedSeq

ts1 should have size 8
ts2 should have size 8
ts1.map(_.name).sorted shouldBe ts2.map(_.name).sorted
ts1.map(_.name) should not be ts2.map(_.name)
}

"Bams.regenerateNmUqMdTags" should "null out fields on unmapped reads" in {
val builder = new SamBuilder(sort=Some(SamOrder.Coordinate))
val rec = builder.addFrag(unmapped=true).get
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* The MIT License
*
* Copyright (c) 2023 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package com.fulcrumgenomics.bam

import com.fulcrumgenomics.bam.api.{SamOrder, SamSource}
import com.fulcrumgenomics.FgBioDef._
import com.fulcrumgenomics.bam.pileup.PileupBuilder
import com.fulcrumgenomics.fasta.{SequenceDictionary, SequenceMetadata}
import com.fulcrumgenomics.testing.{SamBuilder, UnitSpec}

class DownsampleAndNormalizeBamTest extends UnitSpec {
def newBuilder(readLength: Int = 50): SamBuilder = {
/** Makes a new builder with two short chromosomes. */
new SamBuilder(
readLength = readLength,
sort = Some(SamOrder.Coordinate),
sd = Some(SequenceDictionary(
SequenceMetadata(name="chr1", length=1000, index=0),
SequenceMetadata(name="chr2", length=1000, index=0),
))
)
}

/** Generates a temp file and returns its path. */
def newBam(): PathToBam = makeTempFile("downsampled.", ".bam")


"DownsampleAndNormalizeBam" should "run ok on an empty bam" in {
val empty = newBuilder().toTempFile()
val out = newBam()
new DownsampleAndNormalizeBam(input=empty, output=out, coverage=100).execute()
val recs = SamSource(out).toIndexedSeq
recs.isEmpty shouldBe true
}

it should "downsample a BAM with excess coverage" in {
val builder = newBuilder()
val out = newBam()
val coverage = 30

// Add lots of reads - 25 read pairs starting at each position until we run out of room
builder.dict.foreach { chrom =>
Range.inclusive(1, chrom.length - 125).foreach { start =>
forloop (from=0, until=5) { _ =>
builder.addPair(contig=chrom.index, start1=start , start2=start+75)
}
}
}

new DownsampleAndNormalizeBam(input = builder.toTempFile(), output = out, coverage = coverage).execute()

val pileIn = PileupBuilder(source=builder.toSource, accessPattern=PileupBuilder.BamAccessPattern.Streaming)
val pileOut = PileupBuilder(source=SamSource(out), accessPattern=PileupBuilder.BamAccessPattern.Streaming)
var totalIn = 0L
var totalOut = 0L

builder.dict.foreach { chrom =>
Range.inclusive(1, chrom.length).foreach { pos =>
val covIn = pileIn.pileup(chrom.name, pos).withoutOverlaps.depth
val covOut = pileOut.pileup(chrom.name, pos).withoutOverlaps.depth
totalIn += covIn
totalOut += covOut


covOut <= covIn shouldBe true
if (covIn < coverage) {
covOut shouldBe covIn
}
else {
covOut should be >= coverage
}
}
}

val meanIn = totalIn / builder.dict.sumBy(_.length).toDouble
val meanOut = totalOut / builder.dict.sumBy(_.length).toDouble

meanOut should be < meanIn
meanOut should be < (coverage * 2).toDouble
meanOut should be > coverage.toDouble
}
}