Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kc fix type #7620

Merged
merged 3 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsExtractCallset.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ workflow GvsExtractCallset {

String output_file_base_name
String? output_gcs_dir
File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/ah_var_store_20211217/gatk-package-4.2.0.0-448-gff251d6-SNAPSHOT-local.jar"
File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_fix_type_20211222/gatk-package-4.2.0.0-451-gbfb465a-SNAPSHOT-local.jar"
Int local_disk_for_extract = 150

String fq_samples_to_extract_table = "~{data_project}.~{default_dataset}.~{extract_table_prefix}__SAMPLES"
Expand Down
2 changes: 1 addition & 1 deletion scripts/variantstore/wdl/GvsImportGenomes.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ workflow GvsImportGenomes {
Int batch_size = 1

Int? preemptible_tries
File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/ah_var_store_20211217/gatk-package-4.2.0.0-448-gff251d6-SNAPSHOT-local.jar"
File? gatk_override = "gs://broad-dsp-spec-ops/scratch/bigquery-jointcalling/jars/kc_fix_type_20211222/gatk-package-4.2.0.0-451-gbfb465a-SNAPSHOT-local.jar"
String? docker
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,9 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.protobuf.Descriptors;
import htsjdk.samtools.SAMSequenceDictionary;
import htsjdk.samtools.util.RuntimeIOException;
import htsjdk.variant.variantcontext.Allele;
Expand All @@ -36,16 +24,12 @@
import org.broadinstitute.hellbender.tools.gvs.common.GQStateEnum;
import org.broadinstitute.hellbender.tools.gvs.common.IngestConstants;
import org.broadinstitute.hellbender.tools.gvs.common.IngestUtils;
import org.broadinstitute.hellbender.tools.gvs.common.SchemaUtils;
import org.broadinstitute.hellbender.utils.*;
import org.broadinstitute.hellbender.utils.bigquery.BigQueryUtils;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;

/**
* Ingest variant walker
Expand All @@ -61,9 +45,7 @@ public final class CreateVariantIngestFiles extends VariantWalker {

private RefCreator refCreator;
private VetCreator vetCreator;
private enum LoadStatus { STARTED, FINISHED };
private TableName loadStatusTable;
private TableSchema loadStatusTableSchema;
private LoadStatus loadStatus;

private GenomeLocSortedSet intervalArgumentGenomeLocSortedSet;

Expand Down Expand Up @@ -184,32 +166,7 @@ private String getInputFileName() {
return pathParts[pathParts.length - 1];
}

private void writeLoadStatus(LoadStatus status) {

// This uses the _default stream since it (a) commits immediately and (b) doesn't count
// towards the CreateStreamWriter quota
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(loadStatusTable.toString(), loadStatusTableSchema).build()) {

// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
JSONObject jsonObject = new JSONObject();
jsonObject.put("sample_id", Long.parseLong(sampleId));
jsonObject.put("status", status.toString());
jsonObject.put("event_timestamp", System.currentTimeMillis());
jsonArr.put(jsonObject);

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();

logger.info("Load status " + status + " appended successfully");
} catch (ExecutionException | InterruptedException | Descriptors.DescriptorValidationException | IOException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
throw new GATKException(e.getMessage());
}
}


@Override
public void onTraversalStart() {
Expand Down Expand Up @@ -260,65 +217,23 @@ public void onTraversalStart() {

// check the load status table to see if this sample has already been loaded...
if (outputType == CommonCode.OutputType.BQ) {
loadStatusTable = TableName.of(projectID, datasetName, loadStatusTableName);

BigQuery bigquery = BigQueryUtils.getBigQueryEndPoint(projectID);
Table table = bigquery.getTable(TableId.of(projectID, datasetName, loadStatusTableName));
loadStatusTableSchema = getLoadStatusTableSchema();
loadStatus = new LoadStatus(projectID, datasetName, loadStatusTableName);

StandardTableDefinition tdd = table.getDefinition();
if (BigQueryUtils.getEstimatedRowsInStreamingBuffer(projectID, datasetName, loadStatusTableName) > 0 ) {
logger.info("Found estimated rows in streaming buffer!!! " + tdd.getStreamingBuffer().getEstimatedRows());
long streamingBufferRows = BigQueryUtils.getEstimatedRowsInStreamingBuffer(projectID, datasetName, loadStatusTableName);
if (streamingBufferRows > 0 ) {
logger.info("Found estimated rows in streaming buffer!!! " + streamingBufferRows);
}

verifySampleIsNotLoaded();
}

}

private TableSchema getLoadStatusTableSchema() {
TableSchema.Builder builder = TableSchema.newBuilder();
builder.addFields(
TableFieldSchema.newBuilder().setName(SchemaUtils.SAMPLE_ID_FIELD_NAME).setType(TableFieldSchema.Type.NUMERIC).setMode(TableFieldSchema.Mode.REQUIRED).build()
);
builder.addFields(
TableFieldSchema.newBuilder().setName(SchemaUtils.LOAD_STATUS_FIELD_NAME).setType(TableFieldSchema.Type.STRING).setMode(TableFieldSchema.Mode.REQUIRED).build()
);
builder.addFields(
TableFieldSchema.newBuilder().setName(SchemaUtils.LOAD_STATUS_EVENT_TIMESTAMP_NAME).setType(TableFieldSchema.Type.TIMESTAMP).setMode(TableFieldSchema.Mode.REQUIRED).build()
);
return builder.build();
}

private void verifySampleIsNotLoaded() {
String query = "SELECT " + SchemaUtils.LOAD_STATUS_FIELD_NAME +
" FROM `" + projectID + "." + datasetName + "." + loadStatusTableName + "` " +
" WHERE " + SchemaUtils.SAMPLE_ID_FIELD_NAME + " = " + sampleId;

TableResult result = BigQueryUtils.executeQuery(projectID, query, true, null);

int startedCount = 0;
int finishedCount = 0;
for ( final FieldValueList row : result.iterateAll() ) {
final String status = row.get(0).getStringValue();
if (LoadStatus.STARTED.toString().equals(status)) {
startedCount++;
} else if (LoadStatus.FINISHED.toString().equals(status)) {
finishedCount++;
LoadStatus.LoadState state = loadStatus.getSampleLoadState(Long.parseLong(sampleId));
if (state == LoadStatus.LoadState.COMPLETE) {
logger.info("Sample id " + sampleId + " was detected as already loaded, exiting successfully.");
System.exit(0);
} else if (state == LoadStatus.LoadState.PARTIAL) {
throw new GATKException("Sample Id " + sampleId + " has already been partially loaded!");
}

}

// if fully loaded, exit successfully!
if (startedCount == 1 && finishedCount == 1) {
logger.info("Sample id " + sampleId + " was detected as already loaded, exiting successfully.");
System.exit(0);
}

// otherwise if there are any records, exit
if (startedCount > 0 || finishedCount > 0) {
throw new GATKException("Sample Id " + sampleId + " has already been partially loaded!");
}
}

@Override
Expand Down Expand Up @@ -366,7 +281,7 @@ public void apply(final VariantContext variant, final ReadsContext readsContext,
@Override
public Object onTraversalSuccess() {
if (outputType == CommonCode.OutputType.BQ) {
writeLoadStatus(LoadStatus.STARTED);
loadStatus.writeLoadStatusStarted(Long.parseLong(sampleId));
}

if (refCreator != null) {
Expand All @@ -385,7 +300,7 @@ public Object onTraversalSuccess() {

// upload the load status table
if (outputType == CommonCode.OutputType.BQ) {
writeLoadStatus(LoadStatus.FINISHED);
loadStatus.writeLoadStatusFinished(Long.parseLong(sampleId));
}

return 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package org.broadinstitute.hellbender.tools.gvs.ingest;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1beta2.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1beta2.TableName;
import com.google.cloud.bigquery.storage.v1beta2.TableSchema;
import com.google.protobuf.Descriptors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.exceptions.GATKException;
import org.broadinstitute.hellbender.tools.gvs.common.SchemaUtils;
import org.broadinstitute.hellbender.utils.bigquery.BigQueryUtils;
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

public class LoadStatus {
static final Logger logger = LogManager.getLogger(org.broadinstitute.hellbender.tools.gvs.ingest.LoadStatus.class);

private enum LoadStatusValues { STARTED, FINISHED };
public enum LoadState { NONE, PARTIAL, COMPLETE };

private final String projectID;
private final String datasetName;
private final String loadStatusTableName;
private final TableName loadStatusTable;

public LoadStatus(String projectID, String datasetName, String loadStatusTableName) {
this.projectID = projectID;
this.datasetName = datasetName;
this.loadStatusTableName = loadStatusTableName;
this.loadStatusTable = TableName.of(projectID, datasetName, loadStatusTableName);
}

public TableSchema getLoadStatusTableSchema() {
TableSchema.Builder builder = TableSchema.newBuilder();
builder.addFields(
TableFieldSchema.newBuilder().setName(SchemaUtils.SAMPLE_ID_FIELD_NAME).setType(TableFieldSchema.Type.INT64).setMode(TableFieldSchema.Mode.REQUIRED).build()
);
builder.addFields(
TableFieldSchema.newBuilder().setName(SchemaUtils.LOAD_STATUS_FIELD_NAME).setType(TableFieldSchema.Type.STRING).setMode(TableFieldSchema.Mode.REQUIRED).build()
);
builder.addFields(
TableFieldSchema.newBuilder().setName(SchemaUtils.LOAD_STATUS_EVENT_TIMESTAMP_NAME).setType(TableFieldSchema.Type.TIMESTAMP).setMode(TableFieldSchema.Mode.REQUIRED).build()
);
return builder.build();
}

public LoadState getSampleLoadState(long sampleId) {
String query = "SELECT " + SchemaUtils.LOAD_STATUS_FIELD_NAME +
" FROM `" + projectID + "." + datasetName + "." + loadStatusTableName + "` " +
" WHERE " + SchemaUtils.SAMPLE_ID_FIELD_NAME + " = " + sampleId;

TableResult result = BigQueryUtils.executeQuery(projectID, query, true, null);

int startedCount = 0;
int finishedCount = 0;
for ( final FieldValueList row : result.iterateAll() ) {
final String status = row.get(0).getStringValue();
if (LoadStatusValues.STARTED.toString().equals(status)) {
startedCount++;
} else if (LoadStatusValues.FINISHED.toString().equals(status)) {
finishedCount++;
}

}

// if fully loaded, exit successfully!
if (startedCount == 1 && finishedCount == 1) {
return LoadState.COMPLETE;
}

if (startedCount == 0 && finishedCount == 0) {
return LoadState.NONE;
}

// otherwise if there are any records, return partial
return LoadState.PARTIAL;
}

public void writeLoadStatusStarted(long sampleId) {
writeLoadStatus(LoadStatusValues.STARTED, sampleId);
}

public void writeLoadStatusFinished(long sampleId) {
writeLoadStatus(LoadStatusValues.FINISHED, sampleId);
}

protected void writeLoadStatus(LoadStatusValues status, long sampleId) {

// This uses the _default stream since it (a) commits immediately and (b) doesn't count
// towards the CreateStreamWriter quota
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(loadStatusTable.toString(), getLoadStatusTableSchema()).build()) {

// Create a JSON object that is compatible with the table schema.
JSONArray jsonArr = new JSONArray();
JSONObject jsonObject = new JSONObject();
jsonObject.put("sample_id", sampleId);
jsonObject.put("status", status.toString());
jsonObject.put("event_timestamp", System.currentTimeMillis() * 1000L); // google wants this in microseconds since epoch...
jsonArr.put(jsonObject);

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();

logger.info("Load status " + status + " appended successfully");
} catch (ExecutionException | InterruptedException | Descriptors.DescriptorValidationException | IOException e) {
// If the wrapped exception is a StatusRuntimeException, check the state of the operation.
// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:
// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.html
throw new GATKException(e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.cloud.bigquery.storage.v1beta2.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.broadinstitute.hellbender.exceptions.GATKException;

public class PendingBQWriter extends CommittedBQWriter {
static final Logger logger = LogManager.getLogger(PendingBQWriter.class);
Expand All @@ -15,7 +16,7 @@ public void flushBuffer() {
try {
writeJsonArray(0);
} catch (Exception ex) {
logger.error("Caught exception writing last records on close", ex);
throw new GATKException("Caught exception writing last records on close of " + writeStream.getName(), ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package org.broadinstitute.hellbender.utils.bigquery;

import org.broadinstitute.hellbender.GATKBaseTest;
import org.broadinstitute.hellbender.tools.gvs.common.SchemaUtils;
import org.broadinstitute.hellbender.tools.gvs.ingest.LoadStatus;
import org.testng.Assert;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.UUID;

public class LoadStatusBQTest extends GATKBaseTest {

private static final String BIGQUERY_TEST_PROJECT = "broad-dsde-dev";
private static final String BIGQUERY_TEST_DATASET = "gatk_bigquery_test_dataset";

private static final String uuid = UUID.randomUUID().toString().replace('-', '_');
private static final String TEMP_TABLE_NAME = "sample_load_status_test_" + uuid;

private static final String FQ_TEMP_TABLE_NAME = String.format("%s.%s.%s",
BIGQUERY_TEST_PROJECT, BIGQUERY_TEST_DATASET, TEMP_TABLE_NAME);

@BeforeTest(groups = {"cloud"})
public void beforeTest() {
BigQueryUtils.executeQuery(
"CREATE TABLE " + FQ_TEMP_TABLE_NAME + " (" +
SchemaUtils.SAMPLE_ID_FIELD_NAME + " INT64, " +
SchemaUtils.LOAD_STATUS_FIELD_NAME + " STRING, " +
SchemaUtils.LOAD_STATUS_EVENT_TIMESTAMP_NAME + " TIMESTAMP " +

")", new HashMap<>());
}

@AfterTest(groups = {"cloud"})
public void afterTest() {
BigQueryUtils.executeQuery("DROP TABLE " + FQ_TEMP_TABLE_NAME, new HashMap<>());
}

@Test(groups = {"cloud"})
public void testUpdateStatus() {
LoadStatus loadStatus = new LoadStatus(BIGQUERY_TEST_PROJECT, BIGQUERY_TEST_DATASET, TEMP_TABLE_NAME);

Assert.assertEquals(loadStatus.getSampleLoadState(1), LoadStatus.LoadState.NONE);

loadStatus.writeLoadStatusStarted(1);
Assert.assertEquals(loadStatus.getSampleLoadState(1), LoadStatus.LoadState.PARTIAL);

loadStatus.writeLoadStatusFinished(1);
Assert.assertEquals(loadStatus.getSampleLoadState(1), LoadStatus.LoadState.COMPLETE);

Assert.assertEquals(loadStatus.getSampleLoadState(2), LoadStatus.LoadState.NONE);

}
}