Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job Add labels to BQ operations from GATK (Issues-199) #7115

Merged
merged 16 commits into from
Mar 19, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.broadinstitute.hellbender.utils.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.bigquery.TableReference;


public class SampleList {
static final Logger logger = LogManager.getLogger(SampleList.class);

Expand Down Expand Up @@ -81,8 +82,9 @@ private TableResult querySampleTable(String fqSampleTableName, String whereClaus
"SELECT " + SchemaUtils.SAMPLE_ID_FIELD_NAME + ", " + SchemaUtils.SAMPLE_NAME_FIELD_NAME +
" FROM `" + fqSampleTableName + "`" + whereClause;

// Execute the query:
final TableResult result = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(executionProjectId) , sampleListQueryString, false);

// Execute the query:
final TableResult result = BigQueryUtils.executeQuery(sampleListQueryString, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this back to the invocation that supplied the projectId, but also includes your null for the labels. e.g.

final TableResult result = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(executionProjectId) , sampleListQueryString, false, null);


// Show our pretty results:
if (printDebugInformation) {
Expand All @@ -93,5 +95,4 @@ private TableResult querySampleTable(String fqSampleTableName, String whereClaus

return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void traverse() {
}
// create the query string
String q = "SELECT " + StringUtils.join(SchemaUtils.COHORT_FIELDS,",") + " FROM " + cohortTableRef.getFQTableName() + " ORDER BY " + SchemaUtils.LOCATION_FIELD_NAME;
TableResult tr = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(), cohortTableRef.tableProject, cohortTableRef.tableDataset, q);
TableResult tr = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(), cohortTableRef.tableProject, cohortTableRef.tableDataset, q, null);
createVariantsFromSortedTableResults(tr, fullVqsLodMap, fullYngMap, noFilteringRequested);
break;
}
Expand Down Expand Up @@ -328,8 +328,8 @@ private double getQUALapproxFromSampleRecord(GenericRecord sampleRecord) {
// Non-AS QualApprox (used for qualapprox filter) is simply the sum of the AS values (see GnarlyGenotyper)
if (s.contains("|")) {

// take the sum of all non-* alleles
// basically if our alleles are '*,T' or 'G,*' we want to ignore the * part
// take the average of all non-* alleles
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this change? I don't think it's an average

Copy link
Author

@Marianie-Alphonse Marianie-Alphonse Mar 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, why that changed

// basically if our alleles are '*,T' or 'G,*' we want to ignore the * part
String[] alleles = sampleRecord.get(SchemaUtils.ALT_ALLELE_FIELD_NAME).toString().split(",");
String[] parts = s.split("\\|");

Expand Down Expand Up @@ -389,7 +389,7 @@ private void processSampleRecordsForLocation(final long location, final Iterable

totalAsQualApprox += getQUALapproxFromSampleRecord(sampleRecord);

// hasSnpAllele should be set to true if any sample has at least one snp (gnarly definition here)
// hasSnpAllele should be set to true if any sample has at least one snp (gnarly definition here)
boolean thisHasSnp = vc.getAlternateAlleles().stream().anyMatch(allele -> allele != Allele.SPAN_DEL && allele.length() == vc.getReference().length());
// logger.info("\t" + contig + ":" + currentPosition + ": calculated thisHasSnp of " + thisHasSnp + " from " + vc.getAlternateAlleles() + " and ref " + vc.getReference());
hasSnpAllele = hasSnpAllele || thisHasSnp;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,7 @@
import org.broadinstitute.hellbender.tools.walkers.genotyper.GenotypeCalculationArgumentCollection;
import org.broadinstitute.hellbender.utils.GenotypeCounts;
import org.broadinstitute.hellbender.utils.QualityUtils;
import org.broadinstitute.hellbender.utils.bigquery.BigQueryUtils;
import org.broadinstitute.hellbender.utils.bigquery.GATKAvroReader;
import org.broadinstitute.hellbender.utils.bigquery.StorageAPIAvroReader;
import org.broadinstitute.hellbender.utils.bigquery.TableReference;
import org.broadinstitute.hellbender.utils.bigquery.*;
import org.broadinstitute.hellbender.utils.localsort.AvroSortingCollection;
import org.broadinstitute.hellbender.utils.localsort.SortingCollection;
import org.broadinstitute.hellbender.utils.variant.HomoSapiensConstants;
Expand Down Expand Up @@ -100,12 +97,11 @@ public ExtractFeaturesEngine(final String projectID,
public void traverse() {


final String featureQueryString =
final String featureQueryString =

ExtractFeaturesBQ.getVQSRFeatureExtractQueryString(altAlleleTable, sampleListTable, minLocation, maxLocation, trainingSitesOnly, SNP_QUAL_THRESHOLD, INDEL_QUAL_THRESHOLD);

logger.info(featureQueryString);
final StorageAPIAvroReader storageAPIAvroReader = BigQueryUtils.executeQueryWithStorageAPI(featureQueryString, SchemaUtils.FEATURE_EXTRACT_FIELDS, projectID, useBatchQueries);
final StorageAPIAvroReader storageAPIAvroReader = BigQueryUtils.executeQueryWithStorageAPI(featureQueryString, SchemaUtils.FEATURE_EXTRACT_FIELDS, projectID, useBatchQueries, null);

createVQSRInputFromTableResult(storageAPIAvroReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,25 @@ public static BigQuery getBigQueryEndPoint(String executionProjectId) {
* Will block until results are returned.
* For more information on querying BigQuery tables, see: https://cloud.google.com/bigquery/sql-reference/
* @param queryString The {@link BigQuery} query string to execute. Must use standard SQL syntax. Must contain the project ID, data set, and table name in the `FROM` clause for the table from which to retrieve data.
* @param labels The {@link BigQuery} label to add the job run. Must use Map<String, String>. Can be null to indicate no labels.
* @return A {@link TableResult} object containing the results of the query executed.
*/
public static TableResult executeQuery(final String queryString) {
return executeQuery(getBigQueryEndPoint(), queryString, false);
public static TableResult executeQuery(final String queryString, final Map<String, String> labels) {
Marianie-Alphonse marked this conversation as resolved.
Show resolved Hide resolved
return executeQuery(getBigQueryEndPoint(), queryString, false, labels );
}

/**
* Executes the given {@code queryString} on the default instance of {@link BigQuery} as created by {@link #getBigQueryEndPoint()}.
* Will block until results are returned.
* For more information on querying BigQuery tables, see: https://cloud.google.com/bigquery/sql-reference/
* @param queryString The {@link BigQuery} query string to execute. Must use standard SQL syntax. Must contain the project ID, data set, and table name in the `FROM` clause for the table from which to retrieve data.
* @param labels The {@link BigQuery} label to add the job run. Must use Map<String, String>. Can be null to indicate no labels.
* @param runQueryInBatchMode If true, run the query in batch mode, which is lower priority but has no limit on the number of concurrent queries
* @return A {@link TableResult} object containing the results of the query executed.
*/
public static TableResult executeQuery(final String queryString, final boolean runQueryInBatchMode) {
return executeQuery(getBigQueryEndPoint(), queryString, runQueryInBatchMode);

public static TableResult executeQuery(final String queryString, final boolean runQueryInBatchMode, final Map<String, String> labels) {
return executeQuery(getBigQueryEndPoint(), queryString, runQueryInBatchMode, labels);
}

/**
Expand All @@ -79,15 +82,17 @@ public static TableResult executeQuery(final String queryString, final boolean r
* @param bigQuery The {@link BigQuery} instance against which to execute the given {@code queryString}.
* @param queryString The {@link BigQuery} query string to execute. Must use standard SQL syntax. Must contain the project ID, data set, and table name in the `FROM` clause for the table from which to retrieve data.
* @param runQueryInBatchMode If true, run the query in batch mode, which is lower priority but has no limit on the number of concurrent queries
* @param labels The {@link BigQuery} label to add the job run. Must use Map<String, String>. Can be null to indicate no labels.
* @return A {@link TableResult} object containing the results of the query executed.
*/
public static TableResult executeQuery(final BigQuery bigQuery, final String queryString, final boolean runQueryInBatchMode) {
public static TableResult executeQuery(final BigQuery bigQuery, final String queryString, final boolean runQueryInBatchMode, final Map<String, String> labels) {
Marianie-Alphonse marked this conversation as resolved.
Show resolved Hide resolved

// Create a query configuration we can run based on our query string:
final QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder( queryString )
.setUseLegacySql(false)
.setPriority(runQueryInBatchMode ? QueryJobConfiguration.Priority.BATCH : QueryJobConfiguration.Priority.INTERACTIVE)
.setLabels(labels)
Marianie-Alphonse marked this conversation as resolved.
Show resolved Hide resolved
.build();

logger.info("Executing Query: \n\n" + queryString);
Expand All @@ -104,18 +109,21 @@ public static TableResult executeQuery(final BigQuery bigQuery, final String que
* @param projectID The BigQuery {@code project ID} containing the {@code dataSet} and table from which to query data.
* @param dataSet The BigQuery {@code dataSet} containing the table from which to query data.
* @param queryString The {@link BigQuery} query string to execute. Must use standard SQL syntax. Must contain the project ID, data set, and table ID in the `FROM` clause for the table from which to retrieve data.
* @param labels The {@link BigQuery} label to add the job run. Must use Map<String, String>. Can be null to indicate no labels.
* @return A {@link TableResult} object containing the results of the query executed.
*/
public static TableResult executeQuery(final BigQuery bigQuery,
final String projectID,
final String dataSet,
final String queryString) {
final String queryString,
final Map<String, String> labels) {

// Create a query configuration we can run based on our query string:
final QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder( queryString )
.setUseLegacySql(false)
.setDefaultDataset(DatasetId.of(projectID, dataSet))
.setLabels(labels)
.build();

return submitQueryAndWaitForResults( bigQuery, queryConfig );
Expand Down Expand Up @@ -374,12 +382,12 @@ private static long getQueryCostBytesProcessedEstimate(String queryString) {
return bytesProcessed;
}

public static StorageAPIAvroReader executeQueryWithStorageAPI(final String queryString, final List<String> fieldsToRetrieve, final String projectID) {
public static StorageAPIAvroReader executeQueryWithStorageAPI(final String queryString, final List<String> fieldsToRetrieve, final String projectID, Map<String, String> labels) {

return executeQueryWithStorageAPI(queryString, fieldsToRetrieve, projectID, false);
return executeQueryWithStorageAPI(queryString, fieldsToRetrieve, projectID, false, labels);
}

public static StorageAPIAvroReader executeQueryWithStorageAPI(final String queryString, final List<String> fieldsToRetrieve, final String projectID, final boolean runQueryInBatchMode) {
public static StorageAPIAvroReader executeQueryWithStorageAPI(final String queryString, final List<String> fieldsToRetrieve, final String projectID, final boolean runQueryInBatchMode, Map<String, String> labels) {
final String tempTableDataset = "temp_tables";
final String tempTableName = UUID.randomUUID().toString().replace('-', '_');
final String tempTableFullyQualified = String.format("%s.%s.%s", projectID, tempTableDataset, tempTableName);
Expand All @@ -393,7 +401,7 @@ public static StorageAPIAvroReader executeQueryWithStorageAPI(final String query
") AS\n" +
queryString;

executeQuery(queryStringIntoTempTable, runQueryInBatchMode);
executeQuery(queryStringIntoTempTable, runQueryInBatchMode, labels);

final Table tableInfo = getBigQueryEndPoint().getTable( TableId.of(projectID, tempTableDataset, tempTableName) );
logger.info(String.format("Query temp table created with %s rows and %s bytes in size", tableInfo.getNumRows(), tableInfo.getNumBytes()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.*;

/**
Expand All @@ -22,12 +21,14 @@ public class BigQueryUtilsUnitTest extends GATKBaseTest {
private static final String BIGQUERY_FULLY_QUALIFIED_TABLE = String.format("%s.%s.%s",
BIGQUERY_TEST_PROJECT, BIGQUERY_TEST_DATASET, BIGQUERY_TEST_TABLE);

private static final UUID runUuid = UUID.randomUUID();

@Test(groups = {"cloud"})
public void testExecuteQueryAllRecords() {
final String query = String.format("SELECT * FROM `%s`", BIGQUERY_FULLY_QUALIFIED_TABLE);

final TableResult result = BigQueryUtils.executeQuery(query);
Map<String, String> labels = new HashMap<String, String>();
labels.put("test_query", "get_all_records" + runUuid);
final TableResult result = BigQueryUtils.executeQuery(query, labels);

checkQueryResults(result, getAllExpectedNamesAndAges(), query);
}
Expand All @@ -38,26 +39,29 @@ public void testExecuteQueryWithWhereClause() {
expectedNamesAndAges.put("Fred", "35");

final String query = String.format("SELECT * FROM `%s` WHERE name = 'Fred'", BIGQUERY_FULLY_QUALIFIED_TABLE);

final TableResult result = BigQueryUtils.executeQuery(query);
Map<String, String> labels = new HashMap<String, String>();
labels.put("test_query", "test_where_clause" + runUuid);
final TableResult result = BigQueryUtils.executeQuery(query, labels);

checkQueryResults(result, expectedNamesAndAges, query);
}

@Test(groups = {"cloud"})
public void testExecuteQueryInBatchMode() {
final String query = String.format("SELECT * FROM `%s`", BIGQUERY_FULLY_QUALIFIED_TABLE);

final TableResult result = BigQueryUtils.executeQuery(query, true);
Map<String, String> labels = new HashMap<String, String>();
labels.put("test_query", "test_batch_mode" + runUuid);
final TableResult result = BigQueryUtils.executeQuery(query, true, labels);

checkQueryResults(result, getAllExpectedNamesAndAges(), query);
}

@Test(groups = {"cloud"})
public void testSpecifiedExecuteQuery() {
final String query = String.format("SELECT * FROM `%s`", BIGQUERY_TEST_TABLE);

final TableResult result = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(), BIGQUERY_TEST_PROJECT, BIGQUERY_TEST_DATASET, query);
Map<String, String> labels = new HashMap<String, String>();
labels.put("test_query", "test_specified_execute_query" + runUuid);
final TableResult result = BigQueryUtils.executeQuery(BigQueryUtils.getBigQueryEndPoint(), BIGQUERY_TEST_PROJECT, BIGQUERY_TEST_DATASET, query, labels);

checkQueryResults(result, getAllExpectedNamesAndAges(), query);
}
Expand All @@ -70,8 +74,9 @@ public void testQueryWithStorageAPI() {

final List<String> fieldsToRetrieve = new LinkedList<>();
fieldsToRetrieve.add("name");

final StorageAPIAvroReader result = BigQueryUtils.executeQueryWithStorageAPI(query, fieldsToRetrieve, BIGQUERY_TEST_PROJECT);
Map<String, String> labels = new HashMap<String, String>();
labels.put("test_query", "test_storage_api" + runUuid);
final StorageAPIAvroReader result = BigQueryUtils.executeQueryWithStorageAPI(query, fieldsToRetrieve, BIGQUERY_TEST_PROJECT, labels);

int rowCount = 0;
final Set<String> retrievedNames = new HashSet<>();
Expand Down Expand Up @@ -103,8 +108,9 @@ public void testQueryWithEmptyDatasetStorageAPI() {

final List<String> fieldsToRetrieve = new LinkedList<>();
fieldsToRetrieve.add("name");

final StorageAPIAvroReader result = BigQueryUtils.executeQueryWithStorageAPI(query, fieldsToRetrieve, BIGQUERY_TEST_PROJECT);
Map<String, String> labels = new HashMap<String, String>();
labels.put("test_query", "test_storage_api_with_empty_dataset" + runUuid);
final StorageAPIAvroReader result = BigQueryUtils.executeQueryWithStorageAPI(query, fieldsToRetrieve, BIGQUERY_TEST_PROJECT, labels);

int rowCount = 0;
final Set<String> retrievedNames = new HashSet<>();
Expand All @@ -116,6 +122,15 @@ public void testQueryWithEmptyDatasetStorageAPI() {
Assert.assertTrue(retrievedNames.isEmpty(), "No Result expected");
}

@Test(groups = {"cloud"})
public void testQueryWithNullLabel() {
final String query = String.format("SELECT * FROM `%s`", BIGQUERY_FULLY_QUALIFIED_TABLE);
Map<String, String> labels = null;
final TableResult result = BigQueryUtils.executeQuery(query, labels);

checkQueryResults(result, getAllExpectedNamesAndAges(), query);
}

private Map<String, String> getAllExpectedNamesAndAges() {
final Map<String, String> expectedNamesAndAges = new HashMap<>();
expectedNamesAndAges.put("Fred", "35");
Expand Down