From e9abe1c6c09fb93de33039e0d40d6c8006265d86 Mon Sep 17 00:00:00 2001 From: Anoop Johnson Date: Wed, 13 Mar 2024 02:25:19 +0000 Subject: [PATCH] Use BigQuery storage read API when reading external reading BigLake tables The storage APIs support reading BigLake external tables (ie external tables with a connection). But the current implementation uses views which can be expensive, because it requires a query. This PR adds support to read BigLake tables directly using the storage API. There are no behavior changes for external tables and BQ native tables - they use the view and storage APIs respectively. Added a new test for BigLake tables. Co-authored-by: Marcin Rusek --- .github/workflows/ci.yml | 4 +- plugin/trino-bigquery/README.md | 11 +++- .../plugin/bigquery/BigQueryMetadata.java | 25 ++++++-- .../plugin/bigquery/ReadSessionCreator.java | 5 +- .../bigquery/BaseBigQueryConnectorTest.java | 64 +++++++++++++++++++ 5 files changed, 99 insertions(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f59aaa7490c..a72a1f4acd4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -638,13 +638,15 @@ jobs: GCP_STORAGE_BUCKET: ${{ vars.GCP_STORAGE_BUCKET }} BIGQUERY_TESTING_PROJECT_ID: ${{ vars.BIGQUERY_TESTING_PROJECT_ID }} BIGQUERY_TESTING_PARENT_PROJECT_ID: ${{ vars.BIGQUERY_TESTING_PARENT_PROJECT_ID }} + BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID: ${{ vars.BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID }} if: matrix.modules == 'plugin/trino-bigquery' && !contains(matrix.profile, 'cloud-tests-2') && (env.CI_SKIP_SECRETS_PRESENCE_CHECKS != '' || env.BIGQUERY_CREDENTIALS_KEY != '') run: | $MAVEN test ${MAVEN_TEST} -pl :trino-bigquery -Pcloud-tests-1 \ -Dbigquery.credentials-key="${BIGQUERY_CREDENTIALS_KEY}" \ -Dtesting.gcp-storage-bucket="${GCP_STORAGE_BUCKET}" \ -Dtesting.bigquery-project-id="${BIGQUERY_TESTING_PROJECT_ID}" \ - -Dtesting.bigquery-parent-project-id="${BIGQUERY_TESTING_PARENT_PROJECT_ID}" + -Dtesting.bigquery-parent-project-id="${BIGQUERY_TESTING_PARENT_PROJECT_ID}" \ + -Dtesting.bigquery-connection-id="${BIGQUERY_TESTING_BIGLAKE_CONNECTION_ID}" - name: Cloud BigQuery Smoke Tests id: tests-bq-smoke env: diff --git a/plugin/trino-bigquery/README.md b/plugin/trino-bigquery/README.md index 2148a9746a4..8cae94c1c1a 100644 --- a/plugin/trino-bigquery/README.md +++ b/plugin/trino-bigquery/README.md @@ -22,9 +22,16 @@ You can follow the steps below to be able to run the integration tests locally. **BigQuery Admin** role assigned. * Get the base64 encoded text of the service account credentials file using `base64 /path/to/service_account_credentials.json`. +* Create a new BigQuery `CLOUD_RESOURCE` connection and grant the connection service account GCS permissions. + [Documentation](https://cloud.google.com/bigquery/docs/create-cloud-resource-connection). + * `bq mk --connection --location=us --project_id=$PROJECT_ID --connection_type=CLOUD_RESOURCE $CONNECTION_ID` + * Now we need to grant the new connection's service account the GCS permissions. + * To do this, run `bq show --connection $PROJECT_ID.us.$CONNECTION_ID`, which will display the service account ID. + * Grant the service account GCS Object Viewer permissions. e.g.: `gsutil iam ch serviceAccount:bqcx-xxxx@gcp-sa-bigquery-condel.iam.gserviceaccount.com:objectViewer gs://DESTINATION_BUCKET_NAME` + * The `TestBigQueryWithDifferentProjectIdConnectorSmokeTest` requires an alternate project ID which is different from the project ID attached to the service account but the service account still has access to. -* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, and `testing.alternate-bq-project-id` in the IntelliJ "Run Configuration" +* Set the VM options `bigquery.credentials-key`, `testing.gcp-storage-bucket`, `testing.alternate-bq-project-id`, and `testing.bigquery-connection-id` in the IntelliJ "Run Configuration" (or on the CLI if using Maven directly). It should look something like - `-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate`. + `-Dbigquery.credentials-key=base64-text -Dtesting.gcp-storage-bucket=DESTINATION_BUCKET_NAME -Dtesting.alternate-bq-project-id=bigquery-cicd-alternate -Dtesting.bigquery-connection-id=my_project.us.connection-id`. * Run any test of your choice. diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index feb08a62f5f..7da59135183 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -17,6 +17,7 @@ import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; +import com.google.cloud.bigquery.ExternalTableDefinition; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.QueryParameterValue; @@ -118,6 +119,7 @@ import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -340,7 +342,7 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable return null; } - boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition().getType()); + boolean useStorageApi = useStorageApi(session, schemaTableName.getTableName(), tableInfo.get().getDefinition()); ImmutableList.Builder columns = ImmutableList.builder(); columns.addAll(client.buildColumnHandles(tableInfo.get(), useStorageApi)); Optional partitionType = getPartitionType(tableInfo.get().getDefinition()); @@ -360,14 +362,15 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable .withProjectedColumns(columns.build()); } - private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition.Type type) + private static boolean useStorageApi(ConnectorSession session, String tableName, TableDefinition tableDefinition) { + TableDefinition.Type type = tableDefinition.getType(); if (isWildcardTable(type, tableName)) { // Storage API doesn't support reading wildcard tables return false; } - if (type == EXTERNAL) { - // Storage API doesn't support reading external tables + if (type == EXTERNAL && !isBigLakeTable(tableDefinition)) { + // Storage API doesn't support reading external tables except BigLake tables return false; } if ((type == VIEW || type == MATERIALIZED_VIEW) && isSkipViewMaterialization(session)) { @@ -376,6 +379,18 @@ private static boolean useStorageApi(ConnectorSession session, String tableName, return true; } + private static boolean isBigLakeTable(TableDefinition tableDefinition) + { + if (tableDefinition instanceof ExternalTableDefinition externalTableDefinition) { + //BigLake tables are external with connectionId and don't have objectMetadata + List sourceUris = externalTableDefinition.getSourceUris(); + return !isNullOrEmpty(externalTableDefinition.getConnectionId()) && + isNullOrEmpty(externalTableDefinition.getObjectMetadata()) && + (sourceUris != null && sourceUris.stream().allMatch(uri -> uri.startsWith("gs://"))); + } + return false; + } + private Optional getTableInfoIgnoringConflicts(ConnectorSession session, SchemaTableName schemaTableName) { BigQueryClient client = bigQueryClientFactory.create(session); @@ -470,7 +485,7 @@ public Map> listTableColumns(ConnectorSess return tableInfos.stream() .collect(toImmutableMap( table -> new SchemaTableName(table.getTableId().getDataset(), table.getTableId().getTable()), - table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition().getType())).stream() + table -> client.buildColumnHandles(table, useStorageApi(session, table.getTableId().getTable(), table.getDefinition())).stream() .map(BigQueryColumnHandle::getColumnMetadata) .collect(toImmutableList()))); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java index 8a28d3a99a2..7b2339bc067 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/ReadSessionCreator.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Optional; +import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW; import static com.google.cloud.bigquery.TableDefinition.Type.SNAPSHOT; import static com.google.cloud.bigquery.TableDefinition.Type.TABLE; @@ -140,7 +141,7 @@ private TableInfo getActualTable( { TableDefinition tableDefinition = remoteTable.getDefinition(); TableDefinition.Type tableType = tableDefinition.getType(); - if (tableType == TABLE || tableType == SNAPSHOT) { + if (tableType == TABLE || tableType == SNAPSHOT || tableType == EXTERNAL) { return remoteTable; } if (tableType == VIEW || tableType == MATERIALIZED_VIEW) { @@ -152,7 +153,7 @@ private TableInfo getActualTable( // get it from the view return client.getCachedTable(viewExpiration, remoteTable, requiredColumns, filter); } - // Storage API doesn't support reading other table types (materialized views, external) + // Storage API doesn't support reading other table types (materialized views, external except BigLake tables) throw new TrinoException(NOT_SUPPORTED, format("Table type '%s' of table '%s.%s' is not supported", tableType, remoteTable.getTableId().getDataset(), remoteTable.getTableId().getTable())); } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index ff9a6aa6c7a..e54c7d31201 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -42,6 +42,7 @@ import static com.google.common.base.Strings.nullToEmpty; import static com.google.common.collect.ImmutableMultiset.toImmutableMultiset; +import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor; import static io.trino.plugin.bigquery.BigQueryQueryRunner.TEST_SCHEMA; import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE; @@ -66,6 +67,7 @@ public abstract class BaseBigQueryConnectorTest { protected BigQuerySqlExecutor bigQuerySqlExecutor; private String gcpStorageBucket; + private String bigQueryConnectionId; @BeforeAll public void initBigQueryExecutor() @@ -73,6 +75,7 @@ public void initBigQueryExecutor() this.bigQuerySqlExecutor = new BigQuerySqlExecutor(); // Prerequisite: upload region.csv in resources directory to gs://{testing.gcp-storage-bucket}/tpch/tiny/region.csv this.gcpStorageBucket = requiredNonEmptySystemProperty("testing.gcp-storage-bucket"); + this.bigQueryConnectionId = requiredNonEmptySystemProperty("testing.bigquery-connection-id"); } @Override @@ -761,12 +764,73 @@ public void testBigQueryExternalTable() assertUpdate("DROP TABLE test." + externalTable); assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + externalTable + "'"); + + assertThat(getTableReferenceCountInJob(externalTable)).isEqualTo(1); } finally { onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + externalTable); } } + @Test + public void testBigLakeExternalTable() + { + String biglakeExternalTable = "test_biglake_external" + randomNameSuffix(); + try { + onBigQuery("CREATE EXTERNAL TABLE test." + biglakeExternalTable + + " WITH CONNECTION `" + bigQueryConnectionId + "`" + + " OPTIONS (format = 'CSV', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])"); + assertQuery("SELECT table_type FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + biglakeExternalTable + "'", "VALUES 'BASE TABLE'"); + + assertThat(query("DESCRIBE test." + biglakeExternalTable)).matches("DESCRIBE tpch.region"); + assertThat(query("SELECT * FROM test." + biglakeExternalTable)).matches("SELECT * FROM tpch.region"); + + assertUpdate("DROP TABLE test." + biglakeExternalTable); + assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + biglakeExternalTable + "'"); + + // BigLake tables should not run queries, since they are read directly using the storage read API. + assertThat(getTableReferenceCountInJob(biglakeExternalTable)).isEqualTo(0); + } + finally { + onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + biglakeExternalTable); + } + } + + @Test + public void testExternalObjectTable() + { + String objectExternalTable = "test_object_external" + randomNameSuffix(); + + try { + onBigQuery("CREATE EXTERNAL TABLE test." + objectExternalTable + + " WITH CONNECTION `" + bigQueryConnectionId + "`" + + " OPTIONS (object_metadata = 'SIMPLE', uris = ['gs://" + gcpStorageBucket + "/tpch/tiny/region.csv'])"); + assertQuery("SELECT table_type FROM information_schema.tables WHERE table_schema = 'test' AND table_name = '" + objectExternalTable + "'", "VALUES 'BASE TABLE'"); + + assertThat(query("SELECT uri FROM test." + objectExternalTable)).succeeds(); + + assertUpdate("DROP TABLE test." + objectExternalTable); + assertQueryReturnsEmptyResult("SELECT * FROM information_schema.tables WHERE table_schema = 'test' AND table_name LIKE '%" + objectExternalTable + "%'"); + + // BigLake tables should not run queries, since they are read directly using the storage read API. + assertThat(getTableReferenceCountInJob(objectExternalTable)).isEqualTo(1); + } + finally { + onBigQuery("DROP EXTERNAL TABLE IF EXISTS test." + objectExternalTable); + } + } + + private long getTableReferenceCountInJob(String tableName) + { + return bigQuerySqlExecutor.executeQuery(""" + SELECT count(*) FROM region-us.INFORMATION_SCHEMA.JOBS WHERE EXISTS( + SELECT * FROM UNNEST(referenced_tables) AS referenced_table + WHERE referenced_table.table_id = '%s') + """.formatted(tableName)).streamValues() + .map(fieldValues -> fieldValues.getFirst().getLongValue()) + .collect(onlyElement()); + } + @Test public void testQueryLabeling() {