diff --git a/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala b/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala index 16b75316..de9a3a09 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/GatkTask.scala @@ -23,18 +23,48 @@ */ package dagr.tasks.gatk -import java.nio.file.Path +import java.nio.file.{Files, Path} +import java.util.jar.Attributes.Name +import java.util.jar.JarInputStream import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem.{FixedResources, ProcessTask} -import dagr.tasks.{DagrDef, JarTask} -import DagrDef.{PathToFasta, PathToIntervals} +import dagr.tasks.JarTask +import com.fulcrumgenomics.commons.CommonsDef._ import scala.collection.mutable.ListBuffer object GatkTask { + /** The config path to the primary GATK Jar. */ val GatkJarPathConfigKey = "gatk.jar" + + private[gatk] val ImplementationVersion = new Name("Implementation-Version") + private[gatk] val MainClass = new Name("Main-Class") + + /** Attempts to determine the major version from the GATK Jar. */ + def majorVersion(jar: FilePath): Int = { + val in = new JarInputStream(Files.newInputStream(jar)) + val attrs = in.getManifest.getMainAttributes + in.close() + + if (attrs.containsKey(ImplementationVersion)) { + attrs.getValue(ImplementationVersion).takeWhile(_ != '.').toInt + } + else { + attrs.getValue(MainClass) match { + case "org.broadinstitute.sting.gatk.CommandLineGATK" => 1 + case "org.broadinstitute.gatk.engine.CommandLineGATK" => 3 + case x => throw new IllegalArgumentException(s"Couldn't determine GATK version from jar $jar") + } + } + } + + /** The path to the "standard" GATK jar. */ + lazy val GatkJarPath: FilePath = Configuration.configure[Path](GatkTask.GatkJarPathConfigKey) + + /** The major version of the "standard" GATK jar. */ + lazy val GatkMajorVersion: Int = majorVersion(GatkJarPath) } @@ -49,12 +79,18 @@ abstract class GatkTask(val walker: String, requires(Cores(1), Memory("4g")) override def args: Seq[Any] = { - val buffer = ListBuffer[Any]() - buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory)) - buffer.append("-T", this.walker) + val buffer = ListBuffer[Any]() + val jvmArgs = if (gatkMajorVersion >= 4) bamCompression.map(c => s"-Dsamjdk.compression_level=$c") else None + buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory, jvmArgs=jvmArgs)) + + if (gatkMajorVersion < 4) buffer += "-T" + buffer += this.walker + + if (gatkMajorVersion < 4) bamCompression.foreach(c => buffer.append("--bam_compression", c)) + buffer.append("-R", this.ref.toAbsolutePath.toString) intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString)) - bamCompression.foreach(c => buffer.append("--bam_compression", c)) + addWalkerArgs(buffer) buffer.toSeq } @@ -62,5 +98,13 @@ abstract class GatkTask(val walker: String, /** Can be overridden to use a specific GATK jar. */ protected def gatkJar: Path = configure[Path](GatkTask.GatkJarPathConfigKey) + /** The version of GATK being used by this task. */ + protected lazy val gatkMajorVersion: Int = { + val jar = gatkJar + if (jar == GatkTask.GatkJarPath) GatkTask.GatkMajorVersion + else GatkTask.majorVersion(jar) + } + + /** Adds arguments specific to the walker. */ protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala b/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala index d675690b..366e3fbd 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/GenotypeGvcfs.scala @@ -50,9 +50,18 @@ class GenotypeGvcfs private (ref: PathToFasta, val dbSnpVcf: Option[PathToVcf] = None) extends GatkTask("GenotypeGVCFs", ref, intervals=intervals) { + require(gvcfs.length == 1 || gatkMajorVersion < 4, "GenotypeGVCFs only supports one GVCF at a time with GATK version 4+.") + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { - dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath.toString)) - gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath.toString)) - buffer.append("-o", vcf.toAbsolutePath.toString) + // Args that are common to all versions + dbSnpVcf.foreach(v => buffer.append("--dbsnp", v.toAbsolutePath)) + gvcfs.foreach(gvcf => buffer.append("-V", gvcf.toAbsolutePath)) + + gatkMajorVersion match { + case n if n < 4 => + buffer.append("--out", vcf.toAbsolutePath) + case n if n >= 4 => + buffer.append("--output", vcf.toAbsolutePath) + } } } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala b/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala index e20c2367..f04cb5b7 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/HaplotypeCaller.scala @@ -36,6 +36,7 @@ class HaplotypeCaller(ref: PathToFasta, val bam: PathToBam, val vcf: PathToVcf, val maxAlternateAlleles: Int = 3, + val maxHaplotypes: Int = 200, val minPruning: Int = 3, val contaminationFraction: Double = 0.0, val rnaMode: Boolean = false, @@ -47,25 +48,47 @@ class HaplotypeCaller(ref: PathToFasta, extends GatkTask("HaplotypeCaller", ref, intervals=intervals) { override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { - buffer.append("--minPruning", minPruning) - buffer.append("--maxNumHaplotypesInPopulation", "200") - buffer.append("-variant_index_parameter", "128000") - buffer.append("-variant_index_type", "LINEAR") - buffer.append("--emitRefConfidence", "GVCF") - ploidy.foreach(p => buffer.append("--sample_ploidy", p)) - buffer.append("--max_alternate_alleles", maxAlternateAlleles) - maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n)) - minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n)) - buffer.append("--contamination_fraction_to_filter", contaminationFraction) + // Args common between versions buffer.append("-I", bam.toAbsolutePath) - buffer.append("-o", vcf.toAbsolutePath) - if (rnaMode) { - buffer.append("-dontUseSoftClippedBases") - buffer.append("-stand_call_conf", 20) - buffer.append("-stand_emit_conf", 20) - } + gatkMajorVersion match { + case n if n < 4 => + // Args for versions 1-3 + buffer.append("-o", vcf.toAbsolutePath) + buffer.append("--minPruning", minPruning) + buffer.append("--maxNumHaplotypesInPopulation", maxHaplotypes) + buffer.append("--emitRefConfidence", "GVCF") + buffer.append("--max_alternate_alleles", maxAlternateAlleles) + buffer.append("--contamination_fraction_to_filter", contaminationFraction) + ploidy.foreach(p => buffer.append("--sample_ploidy", p)) + if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING") + + buffer.append("-variant_index_parameter", "128000") + buffer.append("-variant_index_type", "LINEAR") + maxReadsInRegionPerSample.foreach(n => buffer.append("--maxReadsInRegionPerSample", n)) + minReadsPerAlignmentStart.foreach(n => buffer.append("--minReadsPerAlignmentStart", n)) + + if (rnaMode) { + buffer.append("-dontUseSoftClippedBases") + buffer.append("-stand_call_conf", 20) + buffer.append("-stand_emit_conf", 20) + } - if (useNativePairHmm) buffer.append("-pairHMM", "VECTOR_LOGLESS_CACHING") + case n if n >= 4 => + // Args for version 4 + buffer.append("-O", vcf.toAbsolutePath) + buffer.append("--min-pruning", minPruning) + buffer.append("--max-num-haplotypes-in-population", "200") + buffer.append("--emit-ref-confidence", "GVCF") + buffer.append("--max-alternate-alleles", maxAlternateAlleles) + buffer.append("--contamination-fraction-to-filter", contaminationFraction) + ploidy.foreach(p => buffer.append("--sample-ploidy", p)) + if (useNativePairHmm) buffer.append("-pairHMM", "FASTEST_AVAILABLE") + + if (rnaMode) { + buffer.append("--dont-use-soft-clipped-bases") + buffer.append("-stand-call-conf", 20) + } + } } } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/IndelRealigner.scala b/tasks/src/main/scala/dagr/tasks/gatk/IndelRealigner.scala index 4706b9b1..4b251b3f 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/IndelRealigner.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/IndelRealigner.scala @@ -95,6 +95,7 @@ class RealignerTargetCreator(val in: Seq[PathToBam], intervals: Option[PathToIntervals]) extends GatkTask(walker = "RealignerTargetCreator", ref = ref, intervals = intervals) { + require(gatkMajorVersion < 4, s"RealignerTargetCreator is not supported in GATK v$gatkMajorVersion.") override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { in.foreach(bam => buffer.append("-I", bam)) buffer.append("-o", out) @@ -111,6 +112,8 @@ class IndelRealigner(val in: Seq[PathToBam], bamCompression: Option[Int] = None) extends GatkTask(walker = "IndelRealigner", ref = ref, bamCompression = bamCompression) { + require(gatkMajorVersion < 4, s"IndelRealigner is not supported in GATK v$gatkMajorVersion.") + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { in.foreach(bam => buffer.append("-I", bam)) known.foreach(vcf => buffer.append("-known", vcf)) diff --git a/tasks/src/main/scala/dagr/tasks/gatk/Mutect1.scala b/tasks/src/main/scala/dagr/tasks/gatk/Mutect1.scala index 055365f6..cf6c0751 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/Mutect1.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/Mutect1.scala @@ -50,6 +50,8 @@ class Mutect1(val tumorBam: PathToBam, val minBaseQuality: Int = 5 ) extends GatkTask(walker="MuTect", ref=ref, intervals=Some(intervals)) { + require(gatkMajorVersion < 4, s"Mutect v1 is not supported in GATK v${gatkMajorVersion}.") + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { buffer.append("-o", callStatsOutput) buffer.append("-vcf", vcfOutput) diff --git a/tasks/src/main/scala/dagr/tasks/gatk/Mutect2.scala b/tasks/src/main/scala/dagr/tasks/gatk/Mutect2.scala index 61c5f446..cfece3e5 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/Mutect2.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/Mutect2.scala @@ -24,8 +24,8 @@ package dagr.tasks.gatk import dagr.core.tasksystem.FixedResources -import dagr.tasks.DagrDef -import DagrDef.{PathToBam, PathToFasta, PathToIntervals, PathToVcf} +import com.fulcrumgenomics.commons.CommonsDef._ +import htsjdk.samtools.SamReaderFactory import scala.collection.mutable.ListBuffer @@ -43,11 +43,28 @@ class Mutect2(val tumorBam: PathToBam, ) extends GatkTask(walker="MuTect2", ref=ref, intervals=Some(intervals)) with FixedResources { override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { - buffer.append("-I:tumor", tumorBam) - buffer.append("-I:normal", normalBam) - buffer.append("-o", output) - buffer.append("--max_alt_allele_in_normal_fraction", maxAltAlleleInNormalFraction) - buffer.append("--max_alt_alleles_in_normal_count", maxAltAlleleInNormalCount) - buffer.append("--max_alt_alleles_in_normal_qscore_sum", maxAltAlleleInNormalBqSum) + gatkMajorVersion match { + case n if n < 4 => + buffer.append("-I:tumor", tumorBam) + buffer.append("-I:normal", normalBam) + buffer.append("-o", output) + buffer.append("--max_alt_allele_in_normal_fraction", maxAltAlleleInNormalFraction) + buffer.append("--max_alt_alleles_in_normal_count", maxAltAlleleInNormalCount) + buffer.append("--max_alt_alleles_in_normal_qscore_sum", maxAltAlleleInNormalBqSum) + case n if n >= 4 => + buffer.append("-I", tumorBam) + buffer.append("-I", normalBam) + sampleNames(normalBam).foreach { n => buffer.append("--normal-sample", n) } + buffer.append("-O", output) + } + } + + /** Gets the unique set of samples names in the RG headers of a BAM. */ + private def sampleNames(bam: PathToBam): Seq[String] = { + val in = SamReaderFactory.make().open(bam) + val rgs = in.getFileHeader.getReadGroups + in.close() + + rgs.flatMap(rg => Option(rg.getSample)).toIndexedSeq.distinct } } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/SplitNCigarReads.scala b/tasks/src/main/scala/dagr/tasks/gatk/SplitNCigarReads.scala index 6ac5d0f1..31af3adb 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/SplitNCigarReads.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/SplitNCigarReads.scala @@ -37,7 +37,13 @@ class SplitNCigarReads(val in: PathToBam, val out: PathToBam, ref:PathToFasta, b override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { buffer.append("-I", in) - buffer.append("-o", out) - buffer.append("-U", "ALLOW_N_CIGAR_READS") + + gatkMajorVersion match { + case n if n < 4 => + buffer.append("-o", out) + buffer.append("-U", "ALLOW_N_CIGAR_READS") + case n if n >= 4 => + buffer.append("-O", out) + } } }