Skip to content

Commit

Permalink
Rudimentary support for other output types, support for scattering ex…
Browse files Browse the repository at this point in the history
…tract cohort (#6949)

* cohort extract work for WGS

* rudimentary support for configurable output types

* support for nocalls

* extract wdl, missing classes, and compiler warning

* tidying
  • Loading branch information
kcibul authored and Marianie-Simeon committed Feb 16, 2021
1 parent 165ca9f commit 058c952
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 37 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,8 @@ dependencies {
compile('org.apache.hadoop:hadoop-client:' + hadoopVersion) // should be a 'provided' dependency
compile('com.github.jsr203hadoop:jsr203hadoop:1.0.3')

compile('org.apache.orc:orc:1.6.5')

compile('de.javakaffee:kryo-serializers:0.41') {
exclude module: 'kryo' // use Spark's version
}
Expand Down
13 changes: 13 additions & 0 deletions scripts/variantstore_wdl/ngs_cohort_extract.inputs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"NgsCohortExtract.reference": "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta",
"NgsCohortExtract.reference_index": "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.fasta.fai",
"NgsCohortExtract.reference_dict": "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict",

"NgsCohortExtract.gatk_override": "gs://broad-dsp-spec-ops/kcibul/gatk-package-4.1.8.1-140-g8aa14d3-SNAPSHOT-local.jar",

"NgsCohortExtract.fq_sample_table": "spec-ops-aou.kc_high_cov_ccdg.cohort_100_of_194",
"NgsCohortExtract.fq_cohort_extract_table": "spec-ops-aou.kc_high_cov_ccdg.exported_cohort_100_test",
"NgsCohortExtract.query_project": "spec-ops-aou",

"NgsCohortExtract.output_file_base_name": "ccdg_high_cov_export_100"
}
115 changes: 115 additions & 0 deletions scripts/variantstore_wdl/ngs_cohort_extract.wdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
version 1.0

workflow NgsCohortExtract {
input {
Int max_chrom_id = 24

# bug in cromwell, can't support large integers...
# https://github.com/broadinstitute/cromwell/issues/2685
String chrom_offset = "1000000000000"

File reference
File reference_index
File reference_dict

String fq_sample_table
String fq_cohort_extract_table
String query_project

String output_file_base_name
File? gatk_override
}

scatter(i in range(max_chrom_id)) {
call ExtractTask {
input:
gatk_override = gatk_override,
reference = reference,
reference_index = reference_index,
reference_dict = reference_dict,
fq_sample_table = fq_sample_table,
chrom_offset = chrom_offset,
chrom_id = i+1,
fq_cohort_extract_table = fq_cohort_extract_table,
read_project_id = query_project,
output_file = "${output_file_base_name}_${i}.vcf.gz"
}
}
}

################################################################################
task ExtractTask {
# indicates that this task should NOT be call cached
meta {
volatile: true
}

input {
# ------------------------------------------------
# Input args:
File reference
File reference_index
File reference_dict

String fq_sample_table

# bug in cromwell, can't support large integers...
# https://github.com/broadinstitute/cromwell/issues/2685
String chrom_offset
Int chrom_id

String fq_cohort_extract_table
String read_project_id
String output_file

# Runtime Options:
File? gatk_override

Int? local_sort_max_records_in_ram = 10000000
}


# ------------------------------------------------
# Run our command:
command <<<
set -e
export GATK_LOCAL_JAR=~{default="/root/gatk.jar" gatk_override}

df -h
min_location=$(echo "~{chrom_id} * ~{chrom_offset}" | bc)
max_location=$(echo "( ~{chrom_id} + 1 ) * ~{chrom_offset}" | bc)

gatk --java-options "-Xmx4g" \
ExtractCohort \
--mode GENOMES --ref-version 38 --query-mode LOCAL_SORT \
-R "~{reference}" \
-O "~{output_file}" \
--local-sort-max-records-in-ram ~{local_sort_max_records_in_ram} \
--sample-table ~{fq_sample_table} \
--cohort-extract-table ~{fq_cohort_extract_table} \
--min-location ${min_location} --max-location ${max_location} \
--project-id ~{read_project_id}
>>>

# ------------------------------------------------
# Runtime settings:
runtime {
docker: "us.gcr.io/broad-dsde-methods/broad-gatk-snapshots:varstore_d8a72b825eab2d979c8877448c0ca948fd9b34c7_change_to_hwe"
memory: "7 GB"
disks: "local-disk 10 HDD"
bootDiskSizeGb: 15
preemptible: 3
cpu: 2
}

# ------------------------------------------------
# Outputs:
output {
File output_vcf = "~{output_file}"
File output_vcf_index = "~{output_file}.tbi"
}
}




Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,10 @@ public enum ModeEnum {
GENOMES,
ARRAYS
}

public enum OutputType {
TSV,
ORC,
PARQUET
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static int getTableNumber(String sampleId, int sampleMod) { // this is ba
public static int getTableNumber(long sampleId, int sampleMod) { // this is based on sample id
// sample ids 1-4000 will go in directory 001
// subtract 1 from the sample id to make it 1-index (or do we want to 0-index?) and add 1 to the dir
int directoryNumber = new Long(Math.floorDiv((sampleId - 1), sampleMod) + 1).intValue(); // TODO omg write some unit tests
int directoryNumber = (int) (Math.floorDiv((sampleId - 1), sampleMod) + 1); // TODO omg write some unit tests
return directoryNumber;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.broadinstitute.hellbender.engine.ReadsContext;
import org.broadinstitute.hellbender.engine.ReferenceContext;
import org.broadinstitute.hellbender.engine.VariantWalker;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.exceptions.UserException;
import org.broadinstitute.hellbender.tools.variantdb.*;
import org.broadinstitute.hellbender.tools.variantdb.IngestConstants;
Expand All @@ -22,6 +23,7 @@

import java.util.*;
import java.io.File;
import java.io.IOException;

/**
* Ingest variant walker
Expand Down Expand Up @@ -78,6 +80,12 @@ public final class CreateVariantIngestFiles extends VariantWalker {
optional = true)
public CommonCode.ModeEnum mode = CommonCode.ModeEnum.EXOMES;

@Argument(fullName = "output-type",
shortName = "ot",
doc = "[Experimental] Output file format: TSV, ORC or PARQUET [default=TSV].",
optional = true)
public CommonCode.OutputType outputType = CommonCode.OutputType.TSV;

@Argument(
fullName = "ref-version",
doc = "Remove this option!!!! only for ease of testing. Valid options are 37 or 38",
Expand Down Expand Up @@ -137,7 +145,7 @@ public void onTraversalStart() {
final GenomeLocSortedSet genomeLocSortedSet = new GenomeLocSortedSet(new GenomeLocParser(seqDictionary));
intervalArgumentGenomeLocSortedSet = GenomeLocSortedSet.createSetFromList(genomeLocSortedSet.getGenomeLocParser(), IntervalUtils.genomeLocsFromLocatables(genomeLocSortedSet.getGenomeLocParser(), intervalArgumentCollection.getIntervals(seqDictionary)));

petTsvCreator = new PetTsvCreator(sampleName, sampleId, tableNumberPrefix, seqDictionary, gqStateToIgnore, outputDir);
petTsvCreator = new PetTsvCreator(sampleName, sampleId, tableNumberPrefix, seqDictionary, gqStateToIgnore, outputDir, outputType);
switch (mode) {
case EXOMES:
case GENOMES:
Expand Down Expand Up @@ -177,12 +185,21 @@ public void apply(final VariantContext variant, final ReadsContext readsContext,
if (!variant.isReferenceBlock()) {
vetTsvCreator.apply(variant, readsContext, referenceContext, featureContext);
}
petTsvCreator.apply(variant, intervalsToWrite);
try {
petTsvCreator.apply(variant, intervalsToWrite);
} catch (IOException ioe) {
throw new GATKException("Error writing PET", ioe);
}

}

@Override
public Object onTraversalSuccess() {
petTsvCreator.writeMissingIntervals(intervalArgumentGenomeLocSortedSet);
try {
petTsvCreator.writeMissingIntervals(intervalArgumentGenomeLocSortedSet);
} catch (IOException ioe) {
throw new GATKException("Error writing missing intervals", ioe);
}
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
package org.broadinstitute.hellbender.tools.variantdb.nextgen;

import htsjdk.variant.variantcontext.writer.VariantContextWriter;
import htsjdk.variant.vcf.VCFHeader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.barclay.argparser.Argument;
import org.broadinstitute.barclay.argparser.CommandLineProgramProperties;
import org.broadinstitute.barclay.help.DocumentedFeature;
import org.broadinstitute.hellbender.cmdline.StandardArgumentDefinitions;
import org.broadinstitute.hellbender.cmdline.programgroups.ShortVariantDiscoveryProgramGroup;
import org.broadinstitute.hellbender.engine.GATKTool;
import org.broadinstitute.hellbender.tools.variantdb.ChromosomeEnum;
import org.broadinstitute.hellbender.tools.variantdb.CommonCode;
import org.broadinstitute.hellbender.tools.variantdb.SampleList;
import org.broadinstitute.hellbender.tools.variantdb.SchemaUtils;
import org.broadinstitute.hellbender.tools.variantdb.arrays.ExtractCohortBQ;
import org.broadinstitute.hellbender.tools.walkers.annotator.Annotation;
import org.broadinstitute.hellbender.tools.walkers.annotator.StandardAnnotation;
import org.broadinstitute.hellbender.tools.walkers.annotator.VariantAnnotatorEngine;
import org.broadinstitute.hellbender.tools.walkers.annotator.allelespecific.AS_StandardAnnotation;
import org.broadinstitute.hellbender.utils.bigquery.TableReference;
import org.broadinstitute.hellbender.utils.io.IOUtils;

import java.util.*;

Expand Down Expand Up @@ -49,6 +37,19 @@ public class ExtractCohort extends ExtractTool {
)
private String cohortTable = null;

@Argument(
fullName = "min-location",
doc = "When extracting data, only include locations >= this value",
optional = true
)
private Long minLocation = null;

@Argument(
fullName = "max-location",
doc = "When extracting data, only include locations <= this value",
optional = true
)
private Long maxLocation = null;

@Override
protected void onStartup() {
Expand All @@ -68,6 +69,8 @@ protected void onStartup() {
sampleNames,
mode,
cohortTable,
minLocation,
maxLocation,
filteringFQTableName,
localSortMaxRecordsInRam,
printDebugInformation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class ExtractCohortEngine {
private final boolean printDebugInformation;
private final int localSortMaxRecordsInRam;
private final TableReference cohortTableRef;
private final Long minLocation;
private final Long maxLocation;
private final TableReference filteringTableRef;
private final ReferenceDataSource refSource;
private double vqsLodSNPThreshold = 0;
Expand Down Expand Up @@ -75,6 +77,8 @@ public ExtractCohortEngine(final String projectID,
final Set<String> sampleNames,
final CommonCode.ModeEnum mode,
final String cohortTableName,
final Long minLocation,
final Long maxLocation,
final String filteringTableName,
final int localSortMaxRecordsInRam,
final boolean printDebugInformation,
Expand All @@ -89,8 +93,12 @@ public ExtractCohortEngine(final String projectID,
this.refSource = refSource;
this.sampleNames = sampleNames;
this.mode = mode;

this.cohortTableRef = new TableReference(cohortTableName, SchemaUtils.COHORT_FIELDS);
this.filteringTableRef = new TableReference(filteringTableName, SchemaUtils.YNG_FIELDS);
this.minLocation = minLocation;
this.maxLocation = maxLocation;
this.filteringTableRef = filteringTableName == null || "".equals(filteringTableName) ? null : new TableReference(filteringTableName, SchemaUtils.YNG_FIELDS);

this.printDebugInformation = printDebugInformation;
this.vqsLodSNPThreshold = vqsLodSNPThreshold;
this.vqsLodINDELThreshold = vqsLodINDELThreshold;
Expand All @@ -110,7 +118,13 @@ public void traverse() {
if (printDebugInformation) {
logger.debug("using storage api with local sort");
}
final StorageAPIAvroReader storageAPIAvroReader = new StorageAPIAvroReader(cohortTableRef);

String rowRestriction = null;
if (minLocation != null && maxLocation != null) {
rowRestriction = "location >= " + minLocation + " AND location <= " + maxLocation;
}

final StorageAPIAvroReader storageAPIAvroReader = new StorageAPIAvroReader(cohortTableRef, rowRestriction, projectID);
createVariantsFromUngroupedTableResult(storageAPIAvroReader);
break;
case QUERY:
Expand Down Expand Up @@ -447,12 +461,17 @@ private VariantContext createVariantContextFromSampleRecord(final GenericRecord
final String genotypeAttributeName = columnName.substring(SchemaUtils.GENOTYPE_FIELD_PREFIX.length());

if ( genotypeAttributeName.equals(VCFConstants.GENOTYPE_KEY) ) {
final List<Allele> genotypeAlleles =
if ("./.".equals(columnValueString)) {
genotypeBuilder.alleles(Arrays.asList(Allele.NO_CALL, Allele.NO_CALL));

} else {
final List<Allele> genotypeAlleles =
Arrays.stream(columnValueString.split("[/|]"))
.map(Integer::parseInt)
.map(alleleIndex -> alleles.get(alleleIndex))
.collect(Collectors.toList());
genotypeBuilder.alleles(genotypeAlleles);
genotypeBuilder.alleles(genotypeAlleles);
}
} else if ( genotypeAttributeName.equals(VCFConstants.GENOTYPE_QUALITY_KEY) ) {
genotypeBuilder.GQ(Integer.parseInt(columnValueString));
} else if ( genotypeAttributeName.equals(VCFConstants.GENOTYPE_PL_KEY) ) {
Expand Down
Loading

0 comments on commit 058c952

Please sign in to comment.