Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade bq libraries #7264

Merged
merged 2 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ final hadoopVersion = System.getProperty('hadoop.version', '3.2.1')
final disqVersion = System.getProperty('disq.version','0.3.6')

final genomicsdbVersion = System.getProperty('genomicsdb.version','1.3.2')
final bigQueryVersion = System.getProperty('bigQuery.version', '1.117.1')
final bigQueryVersion = System.getProperty('bigQuery.version', '1.131.1')
final bigQueryStorageVersion = System.getProperty('bigQueryStorage.version', '1.21.0')

final guavaVersion = System.getProperty('guava.version', '27.1-jre')
final testNGVersion = '7.0.0'
Expand Down Expand Up @@ -274,7 +275,7 @@ dependencies {
}

compile 'com.google.cloud:google-cloud-bigquery:' + bigQueryVersion
compile 'com.google.cloud:google-cloud-bigquerystorage:1.5.1'
compile 'com.google.cloud:google-cloud-bigquerystorage:' + bigQueryStorageVersion

compile "gov.nist.math.jama:gov.nist.math.jama:1.1.1"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package org.broadinstitute.hellbender.utils.bigquery;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.storage.v1beta1.*;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions.Builder;
import com.google.common.base.Preconditions;
import com.google.cloud.bigquery.storage.v1.AvroRows;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions.Builder;

import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
Expand All @@ -22,11 +27,9 @@ public class StorageAPIAvroReader implements GATKAvroReader {

private static final Logger logger = LogManager.getLogger(StorageAPIAvroReader.class);

private static int rowCount = 0;
private BigQueryReadClient client;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that google renamed storageAPI to ReadAPI, should we rename this class? not super important.


private BigQueryStorageClient client;

private Iterator<Storage.ReadRowsResponse> serverStream;
private Iterator<ReadRowsResponse> serverStream;

private org.apache.avro.Schema schema;

Expand All @@ -36,7 +39,7 @@ public class StorageAPIAvroReader implements GATKAvroReader {
// collection.
private BinaryDecoder decoder = null;

private AvroProto.AvroRows currentAvroRows;
private AvroRows currentAvroRows;

// GenericRecord object will be reused.
private GenericRecord nextRow = null;
Expand All @@ -54,32 +57,39 @@ public StorageAPIAvroReader(final TableReference tableRef, final String rowRestr
try {
logger.info("Using Storage API from " + tableRef.getFQTableName() + " with '" + rowRestriction + "'");

this.client = BigQueryStorageClient.create();
this.client = BigQueryReadClient.create();

final String parent = String.format("projects/%s", parentProjectId == null || parentProjectId.isEmpty() ? tableRef.tableProject : parentProjectId);

final TableReferenceProto.TableReference tableReference = TableReferenceProto.TableReference.newBuilder()
.setProjectId(tableRef.tableProject)
.setDatasetId(tableRef.tableDataset)
.setTableId(tableRef.tableName)
.build();
final String srcTable =
String.format(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there no longer tableReference objects, just table objects? Doing this with a builder pattern feels cleaner

"projects/%s/datasets/%s/tables/%s",
tableRef.tableProject, tableRef.tableDataset, tableRef.tableName);

Builder readOptions = ReadOptions.TableReadOptions.newBuilder()
.addAllSelectedFields(tableRef.fields);
Builder readOptions =
ReadSession.TableReadOptions.newBuilder()
.addAllSelectedFields(tableRef.fields);

if (rowRestriction != null) {
readOptions.setRowRestriction(rowRestriction);
}
final ReadOptions.TableReadOptions tableReadOptions = readOptions.build();

final Storage.CreateReadSessionRequest.Builder builder = Storage.CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setTableReference(tableReference)
.setReadOptions(tableReadOptions)
.setRequestedStreams(1)
.setFormat(Storage.DataFormat.AVRO);

final Storage.ReadSession session = client.createReadSession(builder.build());
final TableReadOptions tableReadOptions = readOptions.build();

// Start specifying the read session we want created.
ReadSession.Builder sessionBuilder =
ReadSession.newBuilder()
.setTable(srcTable)
.setDataFormat(DataFormat.AVRO)
.setReadOptions(tableReadOptions);

// Begin building the session creation request.
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parent)
.setReadSession(sessionBuilder)
.setMaxStreamCount(1);

final ReadSession session = client.createReadSession(builder.build());
if (session.getStreamsCount() > 0) {

this.schema = new org.apache.avro.Schema.Parser().parse(session.getAvroSchema().getSchema());
Expand All @@ -90,12 +100,10 @@ public StorageAPIAvroReader(final TableReference tableRef, final String rowRestr
logger.info("Storage API Session ID: " + session.getName());

// Use the first stream to perform reading.
Storage.StreamPosition readPosition = Storage.StreamPosition.newBuilder()
.setStream(session.getStreams(0))
.build();
String streamName = session.getStreams(0).getName();

Storage.ReadRowsRequest readRowsRequest = Storage.ReadRowsRequest.newBuilder()
.setReadPosition(readPosition)
ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder()
.setReadStream(streamName)
.build();

this.serverStream = client.readRowsCallable().call(readRowsRequest).iterator();
Expand Down