Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Attempt to make the GATK tasks a) auto-detect GATK version and b) swi… #365

Merged
merged 5 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,48 @@
*/
package dagr.tasks.gatk

import java.nio.file.Path
import java.nio.file.{Files, Path}
import java.util.jar.Attributes.Name
import java.util.jar.JarInputStream

import dagr.core.config.Configuration
import dagr.core.execsystem.{Cores, Memory}
import dagr.core.tasksystem.{FixedResources, ProcessTask}
import dagr.tasks.{DagrDef, JarTask}
import DagrDef.{PathToFasta, PathToIntervals}
import dagr.tasks.JarTask
import com.fulcrumgenomics.commons.CommonsDef._

import scala.collection.mutable.ListBuffer

object GatkTask {
/** The config path to the primary GATK Jar. */
val GatkJarPathConfigKey = "gatk.jar"

private[gatk] val ImplementationVersion = new Name("Implementation-Version")
private[gatk] val MainClass = new Name("Main-Class")

/** Attempts to determine the major version from the GATK Jar. */
def majorVersion(jar: FilePath): Int = {
val in = new JarInputStream(Files.newInputStream(jar))
val attrs = in.getManifest.getMainAttributes
in.close()

if (attrs.containsKey(ImplementationVersion)) {
attrs.getValue(ImplementationVersion).takeWhile(_ != '.').toInt
}
else {
attrs.getValue(MainClass) match {
case "org.broadinstitute.sting.gatk.CommandLineGATK" => 1
case "org.broadinstitute.gatk.engine.CommandLineGATK" => 3
case x => throw new IllegalArgumentException(s"Couldn't determine GATK version from jar $jar")
}
}
}
nh13 marked this conversation as resolved.
Show resolved Hide resolved

/** The path to the "standard" GATK jar. */
lazy val GatkJarPath: FilePath = Configuration.configure[Path](GatkTask.GatkJarPathConfigKey)

/** The major version of the "standard" GATK jar. */
lazy val GatkMajorVersion: Int = majorVersion(GatkJarPath)
}


Expand All @@ -49,18 +79,32 @@ abstract class GatkTask(val walker: String,
requires(Cores(1), Memory("4g"))

override def args: Seq[Any] = {
val buffer = ListBuffer[Any]()
buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory))
buffer.append("-T", this.walker)
val buffer = ListBuffer[Any]()
val jvmArgs = if (gatkMajorVersion >= 4) bamCompression.map(c => s"-Dsamjdk.compression_level=$c") else None
buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory, jvmArgs=jvmArgs))

if (gatkMajorVersion < 4) buffer += "-T"
buffer += this.walker

if (gatkMajorVersion < 4) bamCompression.foreach(c => buffer.append("--bam_compression", c))

buffer.append("-R", this.ref.toAbsolutePath.toString)
intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString))
bamCompression.foreach(c => buffer.append("--bam_compression", c))

addWalkerArgs(buffer)
buffer.toSeq
}

/** Can be overridden to use a specific GATK jar. */
protected def gatkJar: Path = configure[Path](GatkTask.GatkJarPathConfigKey)

/** The version of GATK being used by this task. */
protected lazy val gatkMajorVersion: Int = {
val jar = gatkJar
if (jar == GatkTask.GatkJarPath) GatkTask.GatkMajorVersion
else GatkTask.majorVersion(jar)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto about this being something to bake into JarTask. We could also have a trait too, since not everything is a JAR.


/** Adds arguments specific to the walker. */
protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit
}
15 changes: 12 additions & 3 deletions tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,18 @@ class GenotypeGvcfs private (ref: PathToFasta,
val dbSnpVcf: Option[PathToVcf] = None)
extends GatkTask("GenotypeGVCFs", ref, intervals=intervals) {

require(gvcfs.length == 1 || gatkMajorVersion < 4, "GenotypeGVCFs only supports one GVCF at a time with GATK version 4+.")

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath.toString))
gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath.toString))
buffer.append("-o", vcf.toAbsolutePath.toString)
// Args that are common to all versions
dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath))
gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath))

gatkMajorVersion match {
case n if n < 4 =>
buffer.append("--out", vcf.toAbsolutePath)
case n if n >= 4 =>
buffer.append("--output", vcf.toAbsolutePath)
}
}
}
57 changes: 40 additions & 17 deletions tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class HaplotypeCaller(ref: PathToFasta,
val bam: PathToBam,
val vcf: PathToVcf,
val maxAlternateAlleles: Int = 3,
val maxHaplotypes: Int = 200,
val minPruning: Int = 3,
val contaminationFraction: Double = 0.0,
val rnaMode: Boolean = false,
Expand All @@ -47,25 +48,47 @@ class HaplotypeCaller(ref: PathToFasta,
extends GatkTask("HaplotypeCaller", ref, intervals=intervals) {

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("--minPruning", minPruning)
buffer.append("--maxNumHaplotypesInPopulation", "200")
buffer.append("-variant_index_parameter", "128000")
buffer.append("-variant_index_type", "LINEAR")
buffer.append("--emitRefConfidence", "GVCF")
ploidy.foreach(p => buffer.append("--sample_ploidy", p))
buffer.append("--max_alternate_alleles", maxAlternateAlleles)
maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n))
minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n))
buffer.append("--contamination_fraction_to_filter", contaminationFraction)
// Args common between versions
buffer.append("-I", bam.toAbsolutePath)
buffer.append("-o", vcf.toAbsolutePath)

if (rnaMode) {
buffer.append("-dontUseSoftClippedBases")
buffer.append("-stand_call_conf", 20)
buffer.append("-stand_emit_conf", 20)
}
gatkMajorVersion match {
case n if n < 4 =>
// Args for versions 1-3
buffer.append("-o", vcf.toAbsolutePath)
buffer.append("--minPruning", minPruning)
buffer.append("--maxNumHaplotypesInPopulation", maxHaplotypes)
buffer.append("--emitRefConfidence", "GVCF")
buffer.append("--max_alternate_alleles", maxAlternateAlleles)
buffer.append("--contamination_fraction_to_filter", contaminationFraction)
ploidy.foreach(p => buffer.append("--sample_ploidy", p))
if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING")

buffer.append("-variant_index_parameter", "128000")
buffer.append("-variant_index_type", "LINEAR")
maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n))
minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n))

if (rnaMode) {
buffer.append("-dontUseSoftClippedBases")
buffer.append("-stand_call_conf", 20)
buffer.append("-stand_emit_conf", 20)
}

if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING")
case n if n >= 4 =>
// Args for version 4
buffer.append("-O", vcf.toAbsolutePath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In GenotypeGvcfs you have a section before this block that adds common arguments:

// Args that are common to all versions

I'd recommend moving common args similarly, or not have common args in GenotypeGvcfs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost did it, but in fact there's only 1 common arg, and it felt not worth it. Note that output is -o vs.-O. But I can move the -I argument out.

buffer.append("--min-pruning", minPruning)
buffer.append("--max-num-haplotypes-in-population", "200")
buffer.append("--emit-ref-confidence", "GVCF")
buffer.append("--max-alternate-alleles", maxAlternateAlleles)
buffer.append("--contamination-fraction-to-filter", contaminationFraction)
ploidy.foreach(p => buffer.append("--sample-ploidy", p))
if (useNativePairHmm) buffer.append("-pairHMM", "FASTEST_AVAILABLE")

if (rnaMode) {
buffer.append("--dont-use-soft-clipped-bases")
buffer.append("-stand-call-conf", 20)
}
}
}
}
3 changes: 3 additions & 0 deletions tasks/src/main/scala/dagr/tasks/gatk/IndelRealigner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class RealignerTargetCreator(val in: Seq[PathToBam],
intervals: Option[PathToIntervals])
extends GatkTask(walker = "RealignerTargetCreator", ref = ref, intervals = intervals) {

require(gatkMajorVersion < 4, s"RealignerTargetCreator is not supported in GATK v$gatkMajorVersion.")
override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
in.foreach(bam => buffer.append("-I", bam))
buffer.append("-o", out)
Expand All @@ -111,6 +112,8 @@ class IndelRealigner(val in: Seq[PathToBam],
bamCompression: Option[Int] = None)
extends GatkTask(walker = "IndelRealigner", ref = ref, bamCompression = bamCompression) {

require(gatkMajorVersion < 4, s"IndelRealigner is not supported in GATK v$gatkMajorVersion.")

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
in.foreach(bam => buffer.append("-I", bam))
known.foreach(vcf => buffer.append("-known", vcf))
Expand Down
2 changes: 2 additions & 0 deletions tasks/src/main/scala/dagr/tasks/gatk/Mutect1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Mutect1(val tumorBam: PathToBam,
val minBaseQuality: Int = 5
) extends GatkTask(walker="MuTect", ref=ref, intervals=Some(intervals)) {

require(gatkMajorVersion < 4, s"Mutect v1 is not supported in GATK v${gatkMajorVersion}.")

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("-o", callStatsOutput)
buffer.append("-vcf", vcfOutput)
Expand Down
33 changes: 25 additions & 8 deletions tasks/src/main/scala/dagr/tasks/gatk/Mutect2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
package dagr.tasks.gatk

import dagr.core.tasksystem.FixedResources
import dagr.tasks.DagrDef
import DagrDef.{PathToBam, PathToFasta, PathToIntervals, PathToVcf}
import com.fulcrumgenomics.commons.CommonsDef._
import htsjdk.samtools.SamReaderFactory

import scala.collection.mutable.ListBuffer

Expand All @@ -43,11 +43,28 @@ class Mutect2(val tumorBam: PathToBam,
) extends GatkTask(walker="MuTect2", ref=ref, intervals=Some(intervals)) with FixedResources {

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("-I:tumor", tumorBam)
buffer.append("-I:normal", normalBam)
buffer.append("-o", output)
buffer.append("--max_alt_allele_in_normal_fraction", maxAltAlleleInNormalFraction)
buffer.append("--max_alt_alleles_in_normal_count", maxAltAlleleInNormalCount)
buffer.append("--max_alt_alleles_in_normal_qscore_sum", maxAltAlleleInNormalBqSum)
gatkMajorVersion match {
case n if n < 4 =>
buffer.append("-I:tumor", tumorBam)
buffer.append("-I:normal", normalBam)
buffer.append("-o", output)
buffer.append("--max_alt_allele_in_normal_fraction", maxAltAlleleInNormalFraction)
buffer.append("--max_alt_alleles_in_normal_count", maxAltAlleleInNormalCount)
buffer.append("--max_alt_alleles_in_normal_qscore_sum", maxAltAlleleInNormalBqSum)
case n if n >= 4 =>
buffer.append("-I", tumorBam)
buffer.append("-I", normalBam)
sampleNames(normalBam).foreach { n => buffer.append("--normal-sample", n) }
buffer.append("-O", output)
}
}

/** Gets the unique set of samples names in the RG headers of a BAM. */
private def sampleNames(bam: PathToBam): Seq[String] = {
val in = SamReaderFactory.make().open(bam)
val rgs = in.getFileHeader.getReadGroups
in.close()

rgs.flatMap(rg => Option(rg.getSample)).toIndexedSeq.distinct
}
}
10 changes: 8 additions & 2 deletions tasks/src/main/scala/dagr/tasks/gatk/SplitNCigarReads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ class SplitNCigarReads(val in: PathToBam, val out: PathToBam, ref:PathToFasta, b

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("-I", in)
buffer.append("-o", out)
buffer.append("-U", "ALLOW_N_CIGAR_READS")

gatkMajorVersion match {
case n if n < 4 =>
buffer.append("-o", out)
buffer.append("-U", "ALLOW_N_CIGAR_READS")
case n if n >= 4 =>
buffer.append("-O", out)
}
}
}