From 56af27891d3001a0358bfd5b1bb4e3f76dee4c6c Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Mon, 20 Feb 2023 16:47:32 +0100 Subject: [PATCH] Label BigQuery jobs with Trino query id ConnectorSession needs to be passed to query/update methods because BigQueryClient is cached using identityCacheMapping.getRemoteUserCacheKey() which is not taking into account session properties. We need also to access queryId in order to properly label queries but we don't want to cache client per query id. --- .../trino/plugin/bigquery/BigQueryClient.java | 38 ++++++----- .../bigquery/BigQueryClientFactory.java | 7 +- .../trino/plugin/bigquery/BigQueryConfig.java | 38 +++++++++++ .../bigquery/BigQueryConnectorModule.java | 9 +++ .../plugin/bigquery/BigQueryLabelFactory.java | 64 +++++++++++++++++++ .../plugin/bigquery/BigQueryMetadata.java | 10 ++- .../bigquery/BigQueryPageSourceProvider.java | 7 +- .../bigquery/BigQueryQueryPageSource.java | 9 ++- .../plugin/bigquery/BigQuerySplitManager.java | 6 +- .../bigquery/BaseBigQueryConnectorTest.java | 47 ++++++++++++++ .../TestBigQueryAvroConnectorTest.java | 2 +- .../plugin/bigquery/TestBigQueryConfig.java | 6 ++ 12 files changed, 205 insertions(+), 38 deletions(-) create mode 100644 plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index 112b20de3af1..9f124cd25f3d 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -22,7 +22,6 @@ import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatistics.QueryStatistics; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -40,6 +39,7 @@ import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -68,6 +68,8 @@ import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -81,6 +83,7 @@ public class BigQueryClient static final Set TABLE_TYPES = ImmutableSet.of(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL, SNAPSHOT); private final BigQuery bigQuery; + private final BigQueryLabelFactory labelFactory; private final ViewMaterializationCache materializationCache; private final boolean caseInsensitiveNameMatching; private final LoadingCache> remoteDatasetCache; @@ -88,12 +91,14 @@ public class BigQueryClient public BigQueryClient( BigQuery bigQuery, + BigQueryLabelFactory labelFactory, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache, Duration metadataCacheTtl, Optional configProjectId) { this.bigQuery = requireNonNull(bigQuery, "bigQuery is null"); + this.labelFactory = requireNonNull(labelFactory, "labelFactory is null"); this.materializationCache = requireNonNull(materializationCache, "materializationCache is null"); this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; this.remoteDatasetCache = EvictableCacheBuilder.newBuilder() @@ -266,30 +271,33 @@ Job create(JobInfo jobInfo) return bigQuery.create(jobInfo); } - public void executeUpdate(QueryJobConfiguration job) + public void executeUpdate(ConnectorSession session, QueryJobConfiguration job) { log.debug("Execute query: %s", job.getQuery()); - try { - bigQuery.query(job); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", job.getQuery()), e); - } + execute(session, job); } - public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition) + public TableResult executeQuery(ConnectorSession session, String sql) { log.debug("Execute query: %s", sql); + QueryJobConfiguration job = QueryJobConfiguration.newBuilder(sql) + .setUseQueryCache(isQueryResultsCacheEnabled(session)) + .setCreateDisposition(createDisposition(session)) + .build(); + return execute(session, job); + } + + private TableResult execute(ConnectorSession session, QueryJobConfiguration job) + { + QueryJobConfiguration jobWithQueryLabel = job.toBuilder() + .setLabels(labelFactory.getLabels(session)) + .build(); try { - return bigQuery.query(QueryJobConfiguration.newBuilder(sql) - .setUseQueryCache(useQueryResultsCache) - .setCreateDisposition(createDisposition) - .build()); + return bigQuery.query(jobWithQueryLabel); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e); + throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", job.getQuery()), e); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java index bc1a4d0280c8..4cde08c3bc57 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java @@ -41,6 +41,7 @@ public class BigQueryClientFactory private final boolean caseInsensitiveNameMatching; private final ViewMaterializationCache materializationCache; private final HeaderProvider headerProvider; + private final BigQueryLabelFactory labelFactory; private final NonEvictableCache clientCache; private final Duration metadataCacheTtl; @@ -50,7 +51,8 @@ public BigQueryClientFactory( BigQueryCredentialsSupplier credentialsSupplier, BigQueryConfig bigQueryConfig, ViewMaterializationCache materializationCache, - HeaderProvider headerProvider) + HeaderProvider headerProvider, + BigQueryLabelFactory labelFactory) { this.identityCacheMapping = requireNonNull(identityCacheMapping, "identityCacheMapping is null"); this.credentialsSupplier = requireNonNull(credentialsSupplier, "credentialsSupplier is null"); @@ -60,6 +62,7 @@ public BigQueryClientFactory( this.caseInsensitiveNameMatching = bigQueryConfig.isCaseInsensitiveNameMatching(); this.materializationCache = requireNonNull(materializationCache, "materializationCache is null"); this.headerProvider = requireNonNull(headerProvider, "headerProvider is null"); + this.labelFactory = requireNonNull(labelFactory, "labelFactory is null"); this.metadataCacheTtl = bigQueryConfig.getMetadataCacheTtl(); CacheBuilder cacheBuilder = CacheBuilder.newBuilder() @@ -77,7 +80,7 @@ public BigQueryClient create(ConnectorSession session) protected BigQueryClient createBigQueryClient(ConnectorSession session) { - return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId); + return new BigQueryClient(createBigQuery(session), labelFactory, caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId); } protected BigQuery createBigQuery(ConnectorSession session) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java index 349eecf9e67c..879ec6436e06 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java @@ -19,8 +19,10 @@ import io.airlift.configuration.DefunctConfig; import io.airlift.units.Duration; import io.airlift.units.MinDuration; +import io.trino.plugin.base.logging.SessionInterpolatedValues; import javax.annotation.PostConstruct; +import javax.validation.constraints.AssertTrue; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -28,6 +30,7 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.base.logging.FormatInterpolator.hasValidPlaceholders; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; @@ -57,6 +60,9 @@ public class BigQueryConfig private Duration metadataCacheTtl = new Duration(0, MILLISECONDS); private boolean queryResultsCacheEnabled; + private String queryLabelName; + private String queryLabelFormat; + private int rpcInitialChannelCount = 1; private int rpcMinChannelCount = 1; private int rpcMaxChannelCount = 1; @@ -266,6 +272,38 @@ public BigQueryConfig setQueryResultsCacheEnabled(boolean queryResultsCacheEnabl return this; } + public String getQueryLabelFormat() + { + return queryLabelFormat; + } + + @Config("bigquery.job.label-format") + @ConfigDescription("Adds `bigquery.job.label-name` label to the BigQuery job with provided value format") + public BigQueryConfig setQueryLabelFormat(String queryLabelFormat) + { + this.queryLabelFormat = queryLabelFormat; + return this; + } + + @AssertTrue(message = "Incorrect bigquery.job.label-format may consist of only letters, digits, underscores, commas, spaces, equal signs and predefined values") + boolean isQueryLabelFormatValid() + { + return queryLabelFormat == null || hasValidPlaceholders(queryLabelFormat, SessionInterpolatedValues.values()); + } + + public String getQueryLabelName() + { + return queryLabelName; + } + + @Config("bigquery.job.label-name") + @ConfigDescription("Adds label with the given name to the BigQuery job") + public BigQueryConfig setQueryLabelName(String queryLabelName) + { + this.queryLabelName = queryLabelName; + return this; + } + @Min(1) @Max(MAX_RPC_CONNECTIONS) public int getRpcInitialChannelCount() diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java index 27081053e7ea..ab35b89426fe 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java @@ -20,6 +20,8 @@ import com.google.inject.Scopes; import com.google.inject.Singleton; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.base.logging.FormatInterpolator; +import io.trino.plugin.base.logging.SessionInterpolatedValues; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.bigquery.ptf.Query; import io.trino.spi.NodeManager; @@ -81,6 +83,13 @@ public static HeaderProvider createHeaderProvider(NodeManager nodeManager) return FixedHeaderProvider.create("user-agent", "Trino/" + nodeManager.getCurrentNode().getVersion()); } + @Provides + @Singleton + public static BigQueryLabelFactory labelFactory(BigQueryConfig config) + { + return new BigQueryLabelFactory(config.getQueryLabelName(), new FormatInterpolator<>(config.getQueryLabelFormat(), SessionInterpolatedValues.values())); + } + /** * Apache Arrow requires reflective access to certain Java internals prohibited since Java 17. * Adds an error to the {@code binder} if required --add-opens is not passed to the JVM. diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java new file mode 100644 index 000000000000..1586f416dfce --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.common.base.CharMatcher; +import io.trino.plugin.base.logging.FormatInterpolator; +import io.trino.spi.connector.ConnectorSession; + +import java.util.Map; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class BigQueryLabelFactory +{ + private static final CharMatcher ALLOWED_CHARS = CharMatcher.inRange('a', 'z') + .or(CharMatcher.inRange('0', '9')) + .or(CharMatcher.anyOf("_-")) + .precomputed(); + + private static final int MAX_LABEL_VALUE_LENGTH = 63; + private final String name; + private final FormatInterpolator interpolator; + + public BigQueryLabelFactory(String labelName, FormatInterpolator interpolator) + { + this.name = labelName; + this.interpolator = requireNonNull(interpolator, "interpolator is null"); + } + + public Map getLabels(ConnectorSession session) + { + if (isNullOrEmpty(name)) { + return Map.of(); + } + + String value = interpolator.interpolate(session).trim(); + if (isNullOrEmpty(value)) { + return Map.of(); + } + + verifyLabelValue(name); + verifyLabelValue(value); + return Map.of(name, value); + } + + private void verifyLabelValue(String value) + { + verify(value.length() <= MAX_LABEL_VALUE_LENGTH, "BigQuery label value cannot be longer than %s characters", MAX_LABEL_VALUE_LENGTH); + verify(ALLOWED_CHARS.matchesAllOf(value), "BigQuery label value can contain only lowercase letters, numeric characters, underscores, and dashes"); + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index 66fc41f07778..fba723599ca6 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -521,7 +521,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa quote(remoteTableName.getProjectId()), quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName())); - client.executeUpdate(QueryJobConfiguration.of(sql)); + client.executeUpdate(session, QueryJobConfiguration.of(sql)); } @Override @@ -603,7 +603,7 @@ private Optional finishInsert( quote(pageSinkIdColumnName), quote(pageSinkIdColumnName)); - client.executeUpdate(QueryJobConfiguration.of(insertSql)); + client.executeUpdate(session, QueryJobConfiguration.of(insertSql)); } finally { try { @@ -636,8 +636,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table quote(remoteTableName.getProjectId()), quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName())); - client.executeUpdate(QueryJobConfiguration.newBuilder(sql) - .setQuery(sql) + client.executeUpdate(session, QueryJobConfiguration.newBuilder(sql) .addPositionalParameter(QueryParameterValue.string(newComment.orElse(null))) .build()); } @@ -656,8 +655,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName()), quote(column.getName())); - client.executeUpdate(QueryJobConfiguration.newBuilder(sql) - .setQuery(sql) + client.executeUpdate(session, QueryJobConfiguration.newBuilder(sql) .addPositionalParameter(QueryParameterValue.string(newComment.orElse(null))) .build()); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index 2cdf2f858566..5c80862ae7cc 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -30,8 +30,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.util.Objects.requireNonNull; public class BigQueryPageSourceProvider @@ -112,12 +110,11 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQueryTableHandle table, List columnHandles, Optional filter) { return new BigQueryQueryPageSource( + session, bigQueryClientFactory.create(session), table, columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()), columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()), - filter, - isQueryResultsCacheEnabled(session), - createDisposition(session)); + filter); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 11daeac84d56..125166465f7c 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -15,7 +15,6 @@ import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; -import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableList; @@ -27,6 +26,7 @@ import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.ArrayType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; @@ -80,13 +80,12 @@ public class BigQueryQueryPageSource private boolean finished; public BigQueryQueryPageSource( + ConnectorSession session, BigQueryClient client, BigQueryTableHandle table, List columnNames, List columnTypes, - Optional filter, - boolean useQueryResultsCache, - CreateDisposition createDisposition) + Optional filter) { requireNonNull(client, "client is null"); requireNonNull(table, "table is null"); @@ -97,7 +96,7 @@ public BigQueryQueryPageSource( this.columnTypes = ImmutableList.copyOf(columnTypes); this.pageBuilder = new PageBuilder(columnTypes); String sql = buildSql(table, client.getProjectId(), ImmutableList.copyOf(columnNames), filter); - this.tableResult = client.query(sql, useQueryResultsCache, createDisposition); + this.tableResult = client.executeQuery(session, sql); } private static String buildSql(BigQueryTableHandle table, String projectId, List columnNames, Optional filter) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index 0c4803b8a316..7bc0d673682b 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -50,8 +50,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization; import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -160,7 +158,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl if (filter.isPresent()) { // count the rows based on the filter String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); + TableResult result = client.executeQuery(session, sql); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { @@ -170,7 +168,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl // (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections. if (TABLE_TYPES.contains(tableInfo.getDefinition().getType())) { String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); + TableResult result = client.executeQuery(session, sql); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index 40121b4af216..34a9be05772f 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -16,8 +16,10 @@ import com.google.cloud.bigquery.TableDefinition; import io.airlift.units.Duration; import io.trino.Session; +import io.trino.spi.QueryId; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import org.intellij.lang.annotations.Language; @@ -32,6 +34,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.function.Function; import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW; @@ -606,6 +609,50 @@ public void testBigQueryExternalTable() } } + @Test + public void testQueryLabeling() + { + Function sessionWithToken = token -> Session.builder(getSession()) + .setTraceToken(Optional.of(token)) + .build(); + + String materializedView = "test_query_label" + randomNameSuffix(); + try { + onBigQuery("CREATE MATERIALIZED VIEW test." + materializedView + " AS SELECT count(1) AS cnt FROM tpch.region"); + + @Language("SQL") + String query = "SELECT * FROM test." + materializedView; + + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(sessionWithToken.apply("first_token"), query); + assertLabelForTable(materializedView, result.getQueryId(), "first_token"); + + MaterializedResultWithQueryId result2 = getDistributedQueryRunner().executeWithQueryId(sessionWithToken.apply("second_token"), query); + assertLabelForTable(materializedView, result2.getQueryId(), "second_token"); + + assertThatThrownBy(() -> getDistributedQueryRunner().executeWithQueryId(sessionWithToken.apply("InvalidToken"), query)) + .hasMessageContaining("BigQuery label value can contain only lowercase letters, numeric characters, underscores, and dashes"); + } + finally { + onBigQuery("DROP MATERIALIZED VIEW IF EXISTS test." + materializedView); + } + } + + private void assertLabelForTable(String expectedView, QueryId queryId, String traceToken) + { + String expectedLabel = "q_" + queryId.toString() + "__t_" + traceToken; + + @Language("SQL") + String checkForLabelQuery = """ + SELECT * FROM region-us.INFORMATION_SCHEMA.JOBS_BY_USER WHERE EXISTS( + SELECT * FROM UNNEST(labels) AS label WHERE label.key = 'trino_query' AND label.value = '%s' + )""".formatted(expectedLabel); + + assertThat(bigQuerySqlExecutor.executeQuery(checkForLabelQuery).getValues()) + .extracting(values -> values.get("query").getStringValue()) + .singleElement() + .matches(statement -> statement.contains(expectedView)); + } + @Test public void testQueryCache() { diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java index e1e5cbd9b81b..bb4566271ab9 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java @@ -46,7 +46,7 @@ protected QueryRunner createQueryRunner() { return BigQueryQueryRunner.createQueryRunner( ImmutableMap.of(), - ImmutableMap.of(), + ImmutableMap.of("bigquery.job.label-name", "trino_query", "bigquery.job.label-format", "q_$QUERY_ID__t_$TRACE_TOKEN"), REQUIRED_TPCH_TABLES); } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java index af1b38892234..5aabfecef005 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java @@ -49,6 +49,8 @@ public void testDefaults() .setViewsEnabled(false) .setArrowSerializationEnabled(false) .setQueryResultsCacheEnabled(false) + .setQueryLabelName(null) + .setQueryLabelFormat(null) .setRpcInitialChannelCount(1) .setMinRpcPerChannel(0) .setMaxRpcPerChannel(Integer.MAX_VALUE) @@ -75,6 +77,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .put("bigquery.service-cache-ttl", "10d") .put("bigquery.metadata.cache-ttl", "5d") .put("bigquery.query-results-cache.enabled", "true") + .put("bigquery.job.label-name", "trino_job_name") + .put("bigquery.job.label-format", "$TRACE_TOKEN") .put("bigquery.channel-pool.initial-size", "11") .put("bigquery.channel-pool.min-size", "12") .put("bigquery.channel-pool.max-size", "13") @@ -98,6 +102,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .setServiceCacheTtl(new Duration(10, DAYS)) .setMetadataCacheTtl(new Duration(5, DAYS)) .setQueryResultsCacheEnabled(true) + .setQueryLabelName("trino_job_name") + .setQueryLabelFormat("$TRACE_TOKEN") .setRpcInitialChannelCount(11) .setRpcMinChannelCount(12) .setRpcMaxChannelCount(13)