diff --git a/src/main/scala/com/fulcrumgenomics/bam/Bams.scala b/src/main/scala/com/fulcrumgenomics/bam/Bams.scala index 9b5c257bb..61d6d52aa 100644 --- a/src/main/scala/com/fulcrumgenomics/bam/Bams.scala +++ b/src/main/scala/com/fulcrumgenomics/bam/Bams.scala @@ -36,7 +36,7 @@ import htsjdk.samtools.SAMFileHeader.{GroupOrder, SortOrder} import htsjdk.samtools.SamPairUtil.PairOrientation import htsjdk.samtools._ import htsjdk.samtools.reference.ReferenceSequence -import htsjdk.samtools.util.{CloserUtil, CoordMath, SequenceUtil} +import htsjdk.samtools.util.{CloserUtil, CoordMath, Murmur3, SequenceUtil} import java.io.Closeable import scala.math.{max, min} @@ -366,6 +366,39 @@ object Bams extends LazyLogging { new SelfClosingIterator(_iterator, () => queryIterator.close()) } + /** Returns an iterator over records, grouped into [[Template]] objects, in a random order. Reads are first + * sorted based on a hash of their read name in order to randomize the order while collating by read name. + * The randomized reads are then iterated and grouped into [[Template]] objects. + * + * @param in the [[SamSource]] from which to pull reads and the SAM header + * @param randomSeed a seed used to generate the hashes that drive the random sorting. Different seeds will produce + * different randomizations of the reads. Using the same seed will result in the same ordering + * of the data across invocations. + * @param maxInMemory the maximum number of records to keep and sort in memory, if sorting is needed + * @param tmpDir a temp directory to use for temporary sorting files if sorting is needed + */ + def templateRandomIterator(in: SamSource, + randomSeed: Int = 42, + maxInMemory: Int = Bams.MaxInMemory, + tmpDir: DirPath = Io.tmpDir): Iterator[Template] = { + val hasher = new Murmur3(randomSeed) + val sorter = { + val f = (r: SamRecord) => SamOrder.RandomQueryKey(hasher.hashUnencodedChars(r.name), r.name, r.asSam.getFlags) + new Sorter(maxInMemory, new SamRecordCodec(in.header), f) + } + + val sortProgress = ProgressLogger(logger, verb = "sorted", unit = 5e6.toInt) + in.foreach { rec => + sorter += rec + sortProgress.record() + } + + val header = in.header.clone() + SamOrder.RandomQuery.applyTo(header) + Bams.templateIterator(sorter.iterator, header, maxInMemory, tmpDir) + } + + /** Returns an iterator over the records in the given iterator such that the order of the records returned is * determined by the value of the given SAM tag, which can optionally be transformed. * diff --git a/src/main/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBam.scala b/src/main/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBam.scala new file mode 100644 index 000000000..d81f8113c --- /dev/null +++ b/src/main/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBam.scala @@ -0,0 +1,186 @@ +/* + * The MIT License + * + * Copyright (c) 2023 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.fulcrumgenomics.bam + +import com.fulcrumgenomics.FgBioDef._ +import com.fulcrumgenomics.bam.DownsampleAndNormalizeBam.CoverageTracker +import com.fulcrumgenomics.bam.api._ +import com.fulcrumgenomics.cmdline.{ClpGroups, FgBioTool} +import com.fulcrumgenomics.commons.util.LazyLogging +import com.fulcrumgenomics.sopt.{arg, clp} +import com.fulcrumgenomics.util.{Io, ProgressLogger, Sorter} +import htsjdk.samtools.util.IntervalList.IntervalMergerIterator +import htsjdk.samtools.util.{CoordMath, Interval, IntervalList, Murmur3, OverlapDetector} + +/** + * Companion object with helper classes. + */ +private object DownsampleAndNormalizeBam extends LazyLogging { + /** + * Tracks coverage over a set of regions. + */ + class CoverageTracker(regions: Seq[Interval], + val targetCoverage: Int, + val minMapQ: Int + ) { + + // Overlap detector for region => coverage + private val detector = new OverlapDetector[RegionCoverage](0, 0) + regions.foreach { region => + detector.addLhs(new RegionCoverage(region.getContig, region.getStart, region.getEnd), region) + } + + /** + * Checks to see if a template has reads that contribute coverage to any bases that are currently + * below the coverage threshold. If so, coverage from all acceptable reads is accumulated and the + * method returns true. Otherwise no coverage is accumulated and the method returns false. + * + * @param t a Template to accumulate coverage from if it covers any bases that are under the coverage threshold + * @return true if the template added coverage, false otherwise + */ + def add(t: Template): Boolean = { + val recs = t.allReads + .filterNot(_.secondary) + .filterNot(_.unmapped) + .filterNot(_.duplicate) + .filter(_.mapq >= minMapQ) + .toSeq + + val addsCoverage = recs.exists { rec => + detector.getOverlaps(rec.asSam).iterator().exists { cov => + Range.inclusive(math.max(rec.start, cov.start), math.min(rec.end, cov.end)).exists { pos => + cov(pos) < targetCoverage + } + } + } + + if (addsCoverage) { + val intervals = { + val iter = recs + .sortBy(r => (r.refName, r.start, r.end)) + .iterator + .map(r => new Interval(r.refName, r.start, r.end)) + new IntervalMergerIterator(iter, true, false, false) + } + + for (interval <- intervals; cov <- detector.getOverlaps(interval).iterator()) { + cov.add(interval.getStart, interval.getEnd) + } + } + + addsCoverage + } + } + + /** + * Coverage counter for a region - all coordinates in the public API are 1-based closed ended. + */ + class RegionCoverage(val chrom: String, val start: Int, val end: Int) { + private val offset = start + private val counts = new Array[Int](CoordMath.getLength(start, end)) + + /** Returns the coverage at the given genomic position, or -1 if the position is not between start:end. */ + def apply(i: Int): Int = { + if (i < start || i > end) { + throw new IndexOutOfBoundsException(s"Position $i is outside of range for region $chrom:$start-$end.") + } + + this.counts(i - offset) + } + + /** Adds 1 to the coverage counts for all bases between from:to inclusive. */ + def add(from: Int, to: Int): Unit = { + forloop (from=math.max(0, from-offset), until=math.min(counts.length, to-offset+1)) { i => + this.counts(i) += 1 + } + } + } +} + +@clp( + group=ClpGroups.SamOrBam, + description = + """ + |Downsamples a BAM in a biased way to a uniform coverage across regions. + | + |Attempts to downsample a BAM such that every base in the genome (or in the target `regions` if provided) + |is covered by at least `coverage` reads. When computing coverage: + | - Reads marked as secondary, duplicate or unmapped are not used + | - A base can receive coverage from only one read with the same queryname (i.e. mate overlaps are not counted) + | - Coverage is counted if a read _spans_ a base, even if that base is deleted in the read + | + |Reads are first sorted into a random order (by hashing read names). Reads are then consumed one template + |at a time, and if any read adds coverage to base that is under the target coverage, _all_ reads (including + |secondary, unmapped, etc.) for that template are emitted into the output. + | + |Given the procedure used for downsampling, it is likely the output BAM will have coverage up to 2X the requested + |coverage at regions in the input BAM that are i) well covered and ii) are close to regions that are poorly + |covered. + """ +) +class DownsampleAndNormalizeBam +( @arg(flag='i', doc="Input SAM or BAM file.") val input: PathToBam, + @arg(flag='o', doc="Output SAM or BAM file.") val output: PathToBam, + @arg(flag='c', doc="Desired minimum coverage.") val coverage: Int, + @arg(flag='m', doc="Minimum mapping quality to count a read as covering.") val minMapQ: Int = 0, + @arg(flag='s', doc="Random seed to use when randomizing order of reads/templates.") val seed: Int = 42, + @arg(flag='l', doc="Optional set of regions for coverage targeting.") val regions: Option[PathToIntervals] = None, + @arg(flag='M', doc="Maximum records to be held in memory while sorting.") val maxInMemory: Int = Bams.MaxInMemory +) extends FgBioTool { + + Io.assertReadable(input) + Io.assertCanWriteFile(output) + regions.foreach(Io.assertReadable) + + override def execute(): Unit = { + val in = SamSource(input) + val out = SamWriter(output, in.header, sort=SamOrder(in.header)) + + val targets: IndexedSeq[Interval] = regions match { + case Some(rs) => IntervalList.fromPath(rs).uniqued().getIntervals.toIndexedSeq + case None => in.dict.map(s => new Interval(s.name, 1, s.length)).toIndexedSeq + } + + val tracker = new CoverageTracker(targets, this.coverage, this.minMapQ) + var (read, written) = (0L, 0L) + + Bams.templateRandomIterator(in, this.seed, maxInMemory).foreach { template => + read += 1 + + if (tracker.add(template)) { + out ++= template.allReads + written += 1 + } + + if (read % 1000000 == 0) { + logger.info(f"Read ${read}%,d templates and wrote out ${written}%,d") + } + } + + in.safelyClose() + out.close() + logger.info(f"Read ${read}%,d templates and wrote out ${written}%,d") + } +} diff --git a/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala b/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala index b7c300097..c6d1b699e 100644 --- a/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala +++ b/src/test/scala/com/fulcrumgenomics/bam/BamsTest.scala @@ -218,6 +218,26 @@ class BamsTest extends UnitSpec { } } + "Bams.templateRandomIterator" should "return template objects in a random order" in { + val builder = new SamBuilder(sort = Some(SamOrder.Coordinate)) + builder.addPair(name = "p1", contig=0, start1 = 100, start2 = 500) + builder.addPair(name = "p2", contig=0, start1 = 200, start2 = 600) + builder.addPair(name = "p3", contig=0, start1 = 300, start2 = 700) + builder.addPair(name = "p4", contig=0, start1 = 400, start2 = 800) + builder.addPair(name = "p5", contig=0, start1 = 400, start2 = 800) + builder.addPair(name = "p6", contig=0, start1 = 400, start2 = 800) + builder.addPair(name = "p7", contig=0, start1 = 400, start2 = 800) + builder.addPair(name = "p8", contig=0, start1 = 400, start2 = 800) + + val ts1 = Bams.templateRandomIterator(builder.toSource, randomSeed=1).toIndexedSeq + val ts2 = Bams.templateRandomIterator(builder.toSource, randomSeed=12345).toIndexedSeq + + ts1 should have size 8 + ts2 should have size 8 + ts1.map(_.name).sorted shouldBe ts2.map(_.name).sorted + ts1.map(_.name) should not be ts2.map(_.name) + } + "Bams.regenerateNmUqMdTags" should "null out fields on unmapped reads" in { val builder = new SamBuilder(sort=Some(SamOrder.Coordinate)) val rec = builder.addFrag(unmapped=true).get diff --git a/src/test/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBamTest.scala b/src/test/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBamTest.scala new file mode 100644 index 000000000..b92261de4 --- /dev/null +++ b/src/test/scala/com/fulcrumgenomics/bam/DownsampleAndNormalizeBamTest.scala @@ -0,0 +1,104 @@ +/* + * The MIT License + * + * Copyright (c) 2023 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +package com.fulcrumgenomics.bam + +import com.fulcrumgenomics.bam.api.{SamOrder, SamSource} +import com.fulcrumgenomics.FgBioDef._ +import com.fulcrumgenomics.bam.pileup.PileupBuilder +import com.fulcrumgenomics.fasta.{SequenceDictionary, SequenceMetadata} +import com.fulcrumgenomics.testing.{SamBuilder, UnitSpec} + +class DownsampleAndNormalizeBamTest extends UnitSpec { + def newBuilder(readLength: Int = 50): SamBuilder = { + /** Makes a new builder with two short chromosomes. */ + new SamBuilder( + readLength = readLength, + sort = Some(SamOrder.Coordinate), + sd = Some(SequenceDictionary( + SequenceMetadata(name="chr1", length=1000, index=0), + SequenceMetadata(name="chr2", length=1000, index=0), + )) + ) + } + + /** Generates a temp file and returns its path. */ + def newBam(): PathToBam = makeTempFile("downsampled.", ".bam") + + + "DownsampleAndNormalizeBam" should "run ok on an empty bam" in { + val empty = newBuilder().toTempFile() + val out = newBam() + new DownsampleAndNormalizeBam(input=empty, output=out, coverage=100).execute() + val recs = SamSource(out).toIndexedSeq + recs.isEmpty shouldBe true + } + + it should "downsample a BAM with excess coverage" in { + val builder = newBuilder() + val out = newBam() + val coverage = 30 + + // Add lots of reads - 25 read pairs starting at each position until we run out of room + builder.dict.foreach { chrom => + Range.inclusive(1, chrom.length - 125).foreach { start => + forloop (from=0, until=5) { _ => + builder.addPair(contig=chrom.index, start1=start , start2=start+75) + } + } + } + + new DownsampleAndNormalizeBam(input = builder.toTempFile(), output = out, coverage = coverage).execute() + + val pileIn = PileupBuilder(source=builder.toSource, accessPattern=PileupBuilder.BamAccessPattern.Streaming) + val pileOut = PileupBuilder(source=SamSource(out), accessPattern=PileupBuilder.BamAccessPattern.Streaming) + var totalIn = 0L + var totalOut = 0L + + builder.dict.foreach { chrom => + Range.inclusive(1, chrom.length).foreach { pos => + val covIn = pileIn.pileup(chrom.name, pos).withoutOverlaps.depth + val covOut = pileOut.pileup(chrom.name, pos).withoutOverlaps.depth + totalIn += covIn + totalOut += covOut + + + covOut <= covIn shouldBe true + if (covIn < coverage) { + covOut shouldBe covIn + } + else { + covOut should be >= coverage + } + } + } + + val meanIn = totalIn / builder.dict.sumBy(_.length).toDouble + val meanOut = totalOut / builder.dict.sumBy(_.length).toDouble + + meanOut should be < meanIn + meanOut should be < (coverage * 2).toDouble + meanOut should be > coverage.toDouble + } +}