Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use BigQuery Read API for reading external BigLake tables #22974

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -643,11 +643,13 @@ jobs:
env:
BIGQUERY_CREDENTIALS_KEY: ${{ secrets.BIGQUERY_CREDENTIALS_KEY }}
GCP_STORAGE_BUCKET: ${{ vars.GCP_STORAGE_BUCKET }}
BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID: ${{ vars.BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID }}
if: matrix.modules == 'plugin/trino-bigquery' && !contains(matrix.profile, 'cloud-tests-2') && (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.BIGQUERY_CREDENTIALS_KEY != '')
run: |
$MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-1 \
-Dbigquery.credentials-key="${BIGQUERY_CREDENTIALS_KEY}" \
-Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}"
-Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}" \
-Dtesting.bigquery-connection-id="${BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID}"
- name: Cloud BigQuery Smoke Tests
id: tests-bq-smoke
env:
Expand Down
11 changes: 9 additions & 2 deletions plugin/trino-bigquery/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@ You can follow the steps below to be able to run the integration tests locally.
**BigQuery Admin** role assigned.
* Get the base64 encoded text of the service account credentials file using `base64
/path/to/service_account_credentials.json`.
* Create a new BigQuery `CLOUD_RESOURCE` connection and grant the connection service account GCS permissions.
[Documentation](https://cloud.google.com/bigquery/docs/create-cloud-resource-connection).
* `bq mk --connection --location=us --project_id=$PROJECT_ID --connection_type=CLOUD_RESOURCE $CONNECTION_ID`
* Now we need to grant the new connection's service account the GCS permissions.
* To do this, run `bq show --connection $PROJECT_ID.us.$CONNECTION_ID`, which will display the service account ID.
* Grant the service account GCS Object Viewer permissions. e.g.: `gsutil iam ch serviceAccount:[email protected]:objectViewer gs://DESTINATION_BUCKET_NAME`

* The `TestBigQueryWithDifferentProjectIdConnectorSmokeTest` requires an alternate project ID which is different from the
project ID attached to the service account but the service account still has access to.
* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, and `testing.alternate-bq-project-id` in the IntelliJ "Run Configuration"
* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, `testing.alternate-bq-project-id`, and `testing.bigquery-connection-id` in the IntelliJ "Run Configuration"
(or on the CLI if using Maven directly). It should look something like
`-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate`.
`-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate -Dtesting.bigquery-connection-id=my_project.us.connection-id`.
* Run any test of your choice.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.ExternalTableDefinition;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
Expand Down Expand Up @@ -119,6 +120,7 @@
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand Down Expand Up @@ -341,7 +343,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
return null;
}

boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType());
boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition());
ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
columns.addAll(client.buildColumnHandles(tableInfo.get(), useStorageApi));
Optional<BigQueryPartitionType> partitionType = getPartitionType(tableInfo.get().getDefinition());
Expand All @@ -362,14 +364,15 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
.withProjectedColumns(columns.build());
}

private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type)
private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition tableDefinition)
{
TableDefinition.Type type = tableDefinition.getType();
if (isWildcardTable(type, tableName)) {
// Storage API doesn't support reading wildcard tables
return false;
}
if (type == EXTERNAL) {
// Storage API doesn't support reading external tables
if (type == EXTERNAL && !isBigLakeTable(tableDefinition)) {
// Storage API doesn't support reading external tables except BigLake tables
return false;
}
if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) {
Expand All @@ -378,6 +381,18 @@ private static boolean useStorageApi(ConnectorSession session, String tableName,
return true;
}

private static boolean isBigLakeTable(TableDefinition tableDefinition)
{
if (tableDefinition instanceof ExternalTableDefinition externalTableDefinition) {
//BigLake tables are external with connectionId that don't have objectMetadata (ObjectTable discriminator) and their uri starts with gs:// (OMNI table discriminator)
List<String> sourceUris = externalTableDefinition.getSourceUris();
return !isNullOrEmpty(externalTableDefinition.getConnectionId()) &&
isNullOrEmpty(externalTableDefinition.getObjectMetadata()) &&
(sourceUris != null && sourceUris.stream().allMatch(uri -> uri.startsWith("gs://")));
}
return false;
}

private Optional<TableInfo> getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName)
{
BigQueryClient client = bigQueryClientFactory.create(session);
Expand Down Expand Up @@ -472,7 +487,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSess
return tableInfos.stream()
.collect(toImmutableMap(
table -> new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable()),
table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition().getType())).stream()
table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition())).stream()
.map(BigQueryColumnHandle::getColumnMetadata)
.collect(toImmutableList())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Optional;

import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL;
import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
import static com.google.cloud.bigquery.TableDefinition.Type.SNAPSHOT;
import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
Expand Down Expand Up @@ -140,7 +141,7 @@ private TableInfo getActualTable(
{
TableDefinition tableDefinition = remoteTable.getDefinition();
TableDefinition.Type tableType = tableDefinition.getType();
if (tableType == TABLE || tableType == SNAPSHOT) {
if (tableType == TABLE || tableType == SNAPSHOT || tableType == EXTERNAL) {
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
return remoteTable;
}
if (tableType == VIEW || tableType == MATERIALIZED_VIEW) {
Expand All @@ -152,7 +153,7 @@ private TableInfo getActualTable(
// get it from the view
return client.getCachedTable(viewExpiration, remoteTable, requiredColumns, filter);
}
// Storage API doesn't support reading other table types (materialized views, external)
// Storage API doesn't support reading other table types (materialized views, non-biglake external tables)
throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported",
tableType, remoteTable.getTableId().getDataset(), remoteTable.getTableId().getTable()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.FieldValue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
Expand Down Expand Up @@ -50,6 +51,7 @@

import static com.google.common.base.Strings.nullToEmpty;
import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
import static io.trino.plugin.bigquery.BigQueryQueryRunner.TEST_SCHEMA;
import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE;
Expand All @@ -62,9 +64,12 @@
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.TestingProperties.requiredNonEmptySystemProperty;
import static io.trino.testing.assertions.Assert.assertConsistently;
import static io.trino.testing.assertions.Assert.assertEventually;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.Assumptions.abort;
Expand All @@ -78,13 +83,15 @@ public abstract class BaseBigQueryConnectorTest
{
protected BigQuerySqlExecutor bigQuerySqlExecutor;
private String gcpStorageBucket;
private String bigQueryConnectionId;

@BeforeAll
public void initBigQueryExecutor()
{
this.bigQuerySqlExecutor = new BigQuerySqlExecutor();
// Prerequisite: upload region.csv in resources directory to gs://{testing.gcp-storage-bucket}/tpch/tiny/region.csv
this.gcpStorageBucket = requiredNonEmptySystemProperty("testing.gcp-storage-bucket");
this.bigQueryConnectionId = requiredNonEmptySystemProperty("testing.bigquery-connection-id");
}

@Override
Expand Down Expand Up @@ -774,12 +781,75 @@ public void testBigQueryExternalTable()

assertUpdate("DROP TABLE test." + externalTable);
assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + externalTable + "'");

assertThat(getTableReferenceCountInJob(externalTable)).isEqualTo(1);
}
finally {
onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + externalTable);
}
}

@Test
public void testBigLakeExternalTable()
{
String biglakeExternalTable = "test_biglake_external" + randomNameSuffix();
try {
onBigQuery("CREATE EXTERNAL TABLE test." + biglakeExternalTable +
" WITH CONNECTION `" + bigQueryConnectionId + "`" +
" OPTIONS (format = 'CSV', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])");

assertThat(query("DESCRIBE test." + biglakeExternalTable)).matches("DESCRIBE tpch.region");
assertThat(query("SELECT * FROM test." + biglakeExternalTable)).matches("SELECT * FROM tpch.region");

assertUpdate("DROP TABLE test." + biglakeExternalTable);
assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + biglakeExternalTable + "'");

// BigLake tables should not run queries, since they are read directly using the storage read API.
assertConsistently(
new Duration(3, SECONDS),
new Duration(500, MILLISECONDS),
() -> assertThat(getTableReferenceCountInJob(biglakeExternalTable)).isEqualTo(0));
}
finally {
onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + biglakeExternalTable);
}
}

@Test
public void testExternalObjectTable()
{
String objectExternalTable = "test_object_external" + randomNameSuffix();

try {
onBigQuery("CREATE EXTERNAL TABLE test." + objectExternalTable +
" WITH CONNECTION `" + bigQueryConnectionId + "`" +
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
" OPTIONS (object_metadata = 'SIMPLE', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])");
assertQuery("SELECT table_type FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + objectExternalTable + "'", "VALUES 'BASE TABLE'");

assertThat(query("SELECT uri FROM test." + objectExternalTable)).succeeds();

assertUpdate("DROP TABLE test." + objectExternalTable);
assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name LIKE '%" + objectExternalTable + "%'");

assertThat(getTableReferenceCountInJob(objectExternalTable)).isEqualTo(1);
}
finally {
onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + objectExternalTable);
}
}

private long getTableReferenceCountInJob(String tableName)
{
return bigQuerySqlExecutor.executeQuery("""
SELECT count(*) FROM region-us.INFORMATION_SCHEMA.JOBS WHERE EXISTS(
SELECT * FROM UNNEST(referenced_tables) AS referenced_table
WHERE referenced_table.table_id = '%s')
""".formatted(tableName)).streamValues()
.map(List::getFirst)
.map(FieldValue::getLongValue)
.collect(onlyElement());
}

ebyhr marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void testQueryLabeling()
{
Expand Down Expand Up @@ -1460,4 +1530,5 @@ private void onBigQuery(@Language("SQL") String sql)
{
bigQuerySqlExecutor.execute(sql);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ public static <E extends Exception> void assertEventually(Duration timeout, Dura
}
}

public static <E extends Exception> void assertConsistently(Duration timeout, Duration retryFrequency, Assert.CheckedRunnable<E> assertion)
throws E
{
long start = System.nanoTime();
while (!Thread.currentThread().isInterrupted()) {
assertion.run();

if (Duration.nanosSince(start).compareTo(timeout) > 0) {
return;
}

try {
Thread.sleep(retryFrequency.toMillis());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}

public interface CheckedRunnable<E extends Exception>
{
void run()
Expand Down
Loading