Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Make the GATK tasks a) auto-detect GATK version
Browse files Browse the repository at this point in the history
Make the GATK tasks a) auto-detect GATK version and b) switch command lines accordingly.
  • Loading branch information
tfenne authored Aug 22, 2019
1 parent 2d84ae1 commit 2b0bab7
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 37 deletions.
58 changes: 51 additions & 7 deletions tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,48 @@
*/
package dagr.tasks.gatk

import java.nio.file.Path
import java.nio.file.{Files, Path}
import java.util.jar.Attributes.Name
import java.util.jar.JarInputStream

import dagr.core.config.Configuration
import dagr.core.execsystem.{Cores, Memory}
import dagr.core.tasksystem.{FixedResources, ProcessTask}
import dagr.tasks.{DagrDef, JarTask}
import DagrDef.{PathToFasta, PathToIntervals}
import dagr.tasks.JarTask
import com.fulcrumgenomics.commons.CommonsDef._

import scala.collection.mutable.ListBuffer

object GatkTask {
/** The config path to the primary GATK Jar. */
val GatkJarPathConfigKey = "gatk.jar"

private[gatk] val ImplementationVersion = new Name("Implementation-Version")
private[gatk] val MainClass = new Name("Main-Class")

/** Attempts to determine the major version from the GATK Jar. */
def majorVersion(jar: FilePath): Int = {
val in = new JarInputStream(Files.newInputStream(jar))
val attrs = in.getManifest.getMainAttributes
in.close()

if (attrs.containsKey(ImplementationVersion)) {
attrs.getValue(ImplementationVersion).takeWhile(_ != '.').toInt
}
else {
attrs.getValue(MainClass) match {
case "org.broadinstitute.sting.gatk.CommandLineGATK" => 1
case "org.broadinstitute.gatk.engine.CommandLineGATK" => 3
case x => throw new IllegalArgumentException(s"Couldn't determine GATK version from jar $jar")
}
}
}

/** The path to the "standard" GATK jar. */
lazy val GatkJarPath: FilePath = Configuration.configure[Path](GatkTask.GatkJarPathConfigKey)

/** The major version of the "standard" GATK jar. */
lazy val GatkMajorVersion: Int = majorVersion(GatkJarPath)
}


Expand All @@ -49,18 +79,32 @@ abstract class GatkTask(val walker: String,
requires(Cores(1), Memory("4g"))

override def args: Seq[Any] = {
val buffer = ListBuffer[Any]()
buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory))
buffer.append("-T", this.walker)
val buffer = ListBuffer[Any]()
val jvmArgs = if (gatkMajorVersion >= 4) bamCompression.map(c => s"-Dsamjdk.compression_level=$c") else None
buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory, jvmArgs=jvmArgs))

if (gatkMajorVersion < 4) buffer += "-T"
buffer += this.walker

if (gatkMajorVersion < 4) bamCompression.foreach(c => buffer.append("--bam_compression", c))

buffer.append("-R", this.ref.toAbsolutePath.toString)
intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString))
bamCompression.foreach(c => buffer.append("--bam_compression", c))

addWalkerArgs(buffer)
buffer.toSeq
}

/** Can be overridden to use a specific GATK jar. */
protected def gatkJar: Path = configure[Path](GatkTask.GatkJarPathConfigKey)

/** The version of GATK being used by this task. */
protected lazy val gatkMajorVersion: Int = {
val jar = gatkJar
if (jar == GatkTask.GatkJarPath) GatkTask.GatkMajorVersion
else GatkTask.majorVersion(jar)
}

/** Adds arguments specific to the walker. */
protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit
}
15 changes: 12 additions & 3 deletions tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,18 @@ class GenotypeGvcfs private (ref: PathToFasta,
val dbSnpVcf: Option[PathToVcf] = None)
extends GatkTask("GenotypeGVCFs", ref, intervals=intervals) {

require(gvcfs.length == 1 || gatkMajorVersion < 4, "GenotypeGVCFs only supports one GVCF at a time with GATK version 4+.")

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath.toString))
gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath.toString))
buffer.append("-o", vcf.toAbsolutePath.toString)
// Args that are common to all versions
dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath))
gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath))

gatkMajorVersion match {
case n if n < 4 =>
buffer.append("--out", vcf.toAbsolutePath)
case n if n >= 4 =>
buffer.append("--output", vcf.toAbsolutePath)
}
}
}
57 changes: 40 additions & 17 deletions tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class HaplotypeCaller(ref: PathToFasta,
val bam: PathToBam,
val vcf: PathToVcf,
val maxAlternateAlleles: Int = 3,
val maxHaplotypes: Int = 200,
val minPruning: Int = 3,
val contaminationFraction: Double = 0.0,
val rnaMode: Boolean = false,
Expand All @@ -47,25 +48,47 @@ class HaplotypeCaller(ref: PathToFasta,
extends GatkTask("HaplotypeCaller", ref, intervals=intervals) {

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("--minPruning", minPruning)
buffer.append("--maxNumHaplotypesInPopulation", "200")
buffer.append("-variant_index_parameter", "128000")
buffer.append("-variant_index_type", "LINEAR")
buffer.append("--emitRefConfidence", "GVCF")
ploidy.foreach(p => buffer.append("--sample_ploidy", p))
buffer.append("--max_alternate_alleles", maxAlternateAlleles)
maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n))
minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n))
buffer.append("--contamination_fraction_to_filter", contaminationFraction)
// Args common between versions
buffer.append("-I", bam.toAbsolutePath)
buffer.append("-o", vcf.toAbsolutePath)

if (rnaMode) {
buffer.append("-dontUseSoftClippedBases")
buffer.append("-stand_call_conf", 20)
buffer.append("-stand_emit_conf", 20)
}
gatkMajorVersion match {
case n if n < 4 =>
// Args for versions 1-3
buffer.append("-o", vcf.toAbsolutePath)
buffer.append("--minPruning", minPruning)
buffer.append("--maxNumHaplotypesInPopulation", maxHaplotypes)
buffer.append("--emitRefConfidence", "GVCF")
buffer.append("--max_alternate_alleles", maxAlternateAlleles)
buffer.append("--contamination_fraction_to_filter", contaminationFraction)
ploidy.foreach(p => buffer.append("--sample_ploidy", p))
if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING")

buffer.append("-variant_index_parameter", "128000")
buffer.append("-variant_index_type", "LINEAR")
maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n))
minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n))

if (rnaMode) {
buffer.append("-dontUseSoftClippedBases")
buffer.append("-stand_call_conf", 20)
buffer.append("-stand_emit_conf", 20)
}

if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING")
case n if n >= 4 =>
// Args for version 4
buffer.append("-O", vcf.toAbsolutePath)
buffer.append("--min-pruning", minPruning)
buffer.append("--max-num-haplotypes-in-population", "200")
buffer.append("--emit-ref-confidence", "GVCF")
buffer.append("--max-alternate-alleles", maxAlternateAlleles)
buffer.append("--contamination-fraction-to-filter", contaminationFraction)
ploidy.foreach(p => buffer.append("--sample-ploidy", p))
if (useNativePairHmm) buffer.append("-pairHMM", "FASTEST_AVAILABLE")

if (rnaMode) {
buffer.append("--dont-use-soft-clipped-bases")
buffer.append("-stand-call-conf", 20)
}
}
}
}
3 changes: 3 additions & 0 deletions tasks/src/main/scala/dagr/tasks/gatk/IndelRealigner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class RealignerTargetCreator(val in: Seq[PathToBam],
intervals: Option[PathToIntervals])
extends GatkTask(walker = "RealignerTargetCreator", ref = ref, intervals = intervals) {

require(gatkMajorVersion < 4, s"RealignerTargetCreator is not supported in GATK v$gatkMajorVersion.")
override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
in.foreach(bam => buffer.append("-I", bam))
buffer.append("-o", out)
Expand All @@ -111,6 +112,8 @@ class IndelRealigner(val in: Seq[PathToBam],
bamCompression: Option[Int] = None)
extends GatkTask(walker = "IndelRealigner", ref = ref, bamCompression = bamCompression) {

require(gatkMajorVersion < 4, s"IndelRealigner is not supported in GATK v$gatkMajorVersion.")

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
in.foreach(bam => buffer.append("-I", bam))
known.foreach(vcf => buffer.append("-known", vcf))
Expand Down
2 changes: 2 additions & 0 deletions tasks/src/main/scala/dagr/tasks/gatk/Mutect1.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Mutect1(val tumorBam: PathToBam,
val minBaseQuality: Int = 5
) extends GatkTask(walker="MuTect", ref=ref, intervals=Some(intervals)) {

require(gatkMajorVersion < 4, s"Mutect v1 is not supported in GATK v${gatkMajorVersion}.")

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("-o", callStatsOutput)
buffer.append("-vcf", vcfOutput)
Expand Down
33 changes: 25 additions & 8 deletions tasks/src/main/scala/dagr/tasks/gatk/Mutect2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
package dagr.tasks.gatk

import dagr.core.tasksystem.FixedResources
import dagr.tasks.DagrDef
import DagrDef.{PathToBam, PathToFasta, PathToIntervals, PathToVcf}
import com.fulcrumgenomics.commons.CommonsDef._
import htsjdk.samtools.SamReaderFactory

import scala.collection.mutable.ListBuffer

Expand All @@ -43,11 +43,28 @@ class Mutect2(val tumorBam: PathToBam,
) extends GatkTask(walker="MuTect2", ref=ref, intervals=Some(intervals)) with FixedResources {

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("-I:tumor", tumorBam)
buffer.append("-I:normal", normalBam)
buffer.append("-o", output)
buffer.append("--max_alt_allele_in_normal_fraction", maxAltAlleleInNormalFraction)
buffer.append("--max_alt_alleles_in_normal_count", maxAltAlleleInNormalCount)
buffer.append("--max_alt_alleles_in_normal_qscore_sum", maxAltAlleleInNormalBqSum)
gatkMajorVersion match {
case n if n < 4 =>
buffer.append("-I:tumor", tumorBam)
buffer.append("-I:normal", normalBam)
buffer.append("-o", output)
buffer.append("--max_alt_allele_in_normal_fraction", maxAltAlleleInNormalFraction)
buffer.append("--max_alt_alleles_in_normal_count", maxAltAlleleInNormalCount)
buffer.append("--max_alt_alleles_in_normal_qscore_sum", maxAltAlleleInNormalBqSum)
case n if n >= 4 =>
buffer.append("-I", tumorBam)
buffer.append("-I", normalBam)
sampleNames(normalBam).foreach { n => buffer.append("--normal-sample", n) }
buffer.append("-O", output)
}
}

/** Gets the unique set of samples names in the RG headers of a BAM. */
private def sampleNames(bam: PathToBam): Seq[String] = {
val in = SamReaderFactory.make().open(bam)
val rgs = in.getFileHeader.getReadGroups
in.close()

rgs.flatMap(rg => Option(rg.getSample)).toIndexedSeq.distinct
}
}
10 changes: 8 additions & 2 deletions tasks/src/main/scala/dagr/tasks/gatk/SplitNCigarReads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ class SplitNCigarReads(val in: PathToBam, val out: PathToBam, ref:PathToFasta, b

override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = {
buffer.append("-I", in)
buffer.append("-o", out)
buffer.append("-U", "ALLOW_N_CIGAR_READS")

gatkMajorVersion match {
case n if n < 4 =>
buffer.append("-o", out)
buffer.append("-U", "ALLOW_N_CIGAR_READS")
case n if n >= 4 =>
buffer.append("-O", out)
}
}
}

0 comments on commit 2b0bab7

Please sign in to comment.