diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index 112b20de3af1..9f124cd25f3d 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -22,7 +22,6 @@ import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatistics.QueryStatistics; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -40,6 +39,7 @@ import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -68,6 +68,8 @@ import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -81,6 +83,7 @@ public class BigQueryClient static final Set TABLE_TYPES = ImmutableSet.of(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL, SNAPSHOT); private final BigQuery bigQuery; + private final BigQueryLabelFactory labelFactory; private final ViewMaterializationCache materializationCache; private final boolean caseInsensitiveNameMatching; private final LoadingCache> remoteDatasetCache; @@ -88,12 +91,14 @@ public class BigQueryClient public BigQueryClient( BigQuery bigQuery, + BigQueryLabelFactory labelFactory, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache, Duration metadataCacheTtl, Optional configProjectId) { this.bigQuery = requireNonNull(bigQuery, "bigQuery is null"); + this.labelFactory = requireNonNull(labelFactory, "labelFactory is null"); this.materializationCache = requireNonNull(materializationCache, "materializationCache is null"); this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; this.remoteDatasetCache = EvictableCacheBuilder.newBuilder() @@ -266,30 +271,33 @@ Job create(JobInfo jobInfo) return bigQuery.create(jobInfo); } - public void executeUpdate(QueryJobConfiguration job) + public void executeUpdate(ConnectorSession session, QueryJobConfiguration job) { log.debug("Execute query: %s", job.getQuery()); - try { - bigQuery.query(job); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", job.getQuery()), e); - } + execute(session, job); } - public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition) + public TableResult executeQuery(ConnectorSession session, String sql) { log.debug("Execute query: %s", sql); + QueryJobConfiguration job = QueryJobConfiguration.newBuilder(sql) + .setUseQueryCache(isQueryResultsCacheEnabled(session)) + .setCreateDisposition(createDisposition(session)) + .build(); + return execute(session, job); + } + + private TableResult execute(ConnectorSession session, QueryJobConfiguration job) + { + QueryJobConfiguration jobWithQueryLabel = job.toBuilder() + .setLabels(labelFactory.getLabels(session)) + .build(); try { - return bigQuery.query(QueryJobConfiguration.newBuilder(sql) - .setUseQueryCache(useQueryResultsCache) - .setCreateDisposition(createDisposition) - .build()); + return bigQuery.query(jobWithQueryLabel); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e); + throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", job.getQuery()), e); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java index bc1a4d0280c8..4cde08c3bc57 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java @@ -41,6 +41,7 @@ public class BigQueryClientFactory private final boolean caseInsensitiveNameMatching; private final ViewMaterializationCache materializationCache; private final HeaderProvider headerProvider; + private final BigQueryLabelFactory labelFactory; private final NonEvictableCache clientCache; private final Duration metadataCacheTtl; @@ -50,7 +51,8 @@ public BigQueryClientFactory( BigQueryCredentialsSupplier credentialsSupplier, BigQueryConfig bigQueryConfig, ViewMaterializationCache materializationCache, - HeaderProvider headerProvider) + HeaderProvider headerProvider, + BigQueryLabelFactory labelFactory) { this.identityCacheMapping = requireNonNull(identityCacheMapping, "identityCacheMapping is null"); this.credentialsSupplier = requireNonNull(credentialsSupplier, "credentialsSupplier is null"); @@ -60,6 +62,7 @@ public BigQueryClientFactory( this.caseInsensitiveNameMatching = bigQueryConfig.isCaseInsensitiveNameMatching(); this.materializationCache = requireNonNull(materializationCache, "materializationCache is null"); this.headerProvider = requireNonNull(headerProvider, "headerProvider is null"); + this.labelFactory = requireNonNull(labelFactory, "labelFactory is null"); this.metadataCacheTtl = bigQueryConfig.getMetadataCacheTtl(); CacheBuilder cacheBuilder = CacheBuilder.newBuilder() @@ -77,7 +80,7 @@ public BigQueryClient create(ConnectorSession session) protected BigQueryClient createBigQueryClient(ConnectorSession session) { - return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId); + return new BigQueryClient(createBigQuery(session), labelFactory, caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId); } protected BigQuery createBigQuery(ConnectorSession session) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java index 349eecf9e67c..879ec6436e06 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java @@ -19,8 +19,10 @@ import io.airlift.configuration.DefunctConfig; import io.airlift.units.Duration; import io.airlift.units.MinDuration; +import io.trino.plugin.base.logging.SessionInterpolatedValues; import javax.annotation.PostConstruct; +import javax.validation.constraints.AssertTrue; import javax.validation.constraints.Max; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; @@ -28,6 +30,7 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkState; +import static io.trino.plugin.base.logging.FormatInterpolator.hasValidPlaceholders; import static java.util.concurrent.TimeUnit.HOURS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; @@ -57,6 +60,9 @@ public class BigQueryConfig private Duration metadataCacheTtl = new Duration(0, MILLISECONDS); private boolean queryResultsCacheEnabled; + private String queryLabelName; + private String queryLabelFormat; + private int rpcInitialChannelCount = 1; private int rpcMinChannelCount = 1; private int rpcMaxChannelCount = 1; @@ -266,6 +272,38 @@ public BigQueryConfig setQueryResultsCacheEnabled(boolean queryResultsCacheEnabl return this; } + public String getQueryLabelFormat() + { + return queryLabelFormat; + } + + @Config("bigquery.job.label-format") + @ConfigDescription("Adds `bigquery.job.label-name` label to the BigQuery job with provided value format") + public BigQueryConfig setQueryLabelFormat(String queryLabelFormat) + { + this.queryLabelFormat = queryLabelFormat; + return this; + } + + @AssertTrue(message = "Incorrect bigquery.job.label-format may consist of only letters, digits, underscores, commas, spaces, equal signs and predefined values") + boolean isQueryLabelFormatValid() + { + return queryLabelFormat == null || hasValidPlaceholders(queryLabelFormat, SessionInterpolatedValues.values()); + } + + public String getQueryLabelName() + { + return queryLabelName; + } + + @Config("bigquery.job.label-name") + @ConfigDescription("Adds label with the given name to the BigQuery job") + public BigQueryConfig setQueryLabelName(String queryLabelName) + { + this.queryLabelName = queryLabelName; + return this; + } + @Min(1) @Max(MAX_RPC_CONNECTIONS) public int getRpcInitialChannelCount() diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java index 27081053e7ea..ab35b89426fe 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConnectorModule.java @@ -20,6 +20,8 @@ import com.google.inject.Scopes; import com.google.inject.Singleton; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.plugin.base.logging.FormatInterpolator; +import io.trino.plugin.base.logging.SessionInterpolatedValues; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.bigquery.ptf.Query; import io.trino.spi.NodeManager; @@ -81,6 +83,13 @@ public static HeaderProvider createHeaderProvider(NodeManager nodeManager) return FixedHeaderProvider.create("user-agent", "Trino/" + nodeManager.getCurrentNode().getVersion()); } + @Provides + @Singleton + public static BigQueryLabelFactory labelFactory(BigQueryConfig config) + { + return new BigQueryLabelFactory(config.getQueryLabelName(), new FormatInterpolator<>(config.getQueryLabelFormat(), SessionInterpolatedValues.values())); + } + /** * Apache Arrow requires reflective access to certain Java internals prohibited since Java 17. * Adds an error to the {@code binder} if required --add-opens is not passed to the JVM. diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java new file mode 100644 index 000000000000..1586f416dfce --- /dev/null +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryLabelFactory.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.bigquery; + +import com.google.common.base.CharMatcher; +import io.trino.plugin.base.logging.FormatInterpolator; +import io.trino.spi.connector.ConnectorSession; + +import java.util.Map; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Verify.verify; +import static java.util.Objects.requireNonNull; + +public class BigQueryLabelFactory +{ + private static final CharMatcher ALLOWED_CHARS = CharMatcher.inRange('a', 'z') + .or(CharMatcher.inRange('0', '9')) + .or(CharMatcher.anyOf("_-")) + .precomputed(); + + private static final int MAX_LABEL_VALUE_LENGTH = 63; + private final String name; + private final FormatInterpolator interpolator; + + public BigQueryLabelFactory(String labelName, FormatInterpolator interpolator) + { + this.name = labelName; + this.interpolator = requireNonNull(interpolator, "interpolator is null"); + } + + public Map getLabels(ConnectorSession session) + { + if (isNullOrEmpty(name)) { + return Map.of(); + } + + String value = interpolator.interpolate(session).trim(); + if (isNullOrEmpty(value)) { + return Map.of(); + } + + verifyLabelValue(name); + verifyLabelValue(value); + return Map.of(name, value); + } + + private void verifyLabelValue(String value) + { + verify(value.length() <= MAX_LABEL_VALUE_LENGTH, "BigQuery label value cannot be longer than %s characters", MAX_LABEL_VALUE_LENGTH); + verify(ALLOWED_CHARS.matchesAllOf(value), "BigQuery label value can contain only lowercase letters, numeric characters, underscores, and dashes"); + } +} diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index 66fc41f07778..fba723599ca6 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -521,7 +521,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa quote(remoteTableName.getProjectId()), quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName())); - client.executeUpdate(QueryJobConfiguration.of(sql)); + client.executeUpdate(session, QueryJobConfiguration.of(sql)); } @Override @@ -603,7 +603,7 @@ private Optional finishInsert( quote(pageSinkIdColumnName), quote(pageSinkIdColumnName)); - client.executeUpdate(QueryJobConfiguration.of(insertSql)); + client.executeUpdate(session, QueryJobConfiguration.of(insertSql)); } finally { try { @@ -636,8 +636,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table quote(remoteTableName.getProjectId()), quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName())); - client.executeUpdate(QueryJobConfiguration.newBuilder(sql) - .setQuery(sql) + client.executeUpdate(session, QueryJobConfiguration.newBuilder(sql) .addPositionalParameter(QueryParameterValue.string(newComment.orElse(null))) .build()); } @@ -656,8 +655,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName()), quote(column.getName())); - client.executeUpdate(QueryJobConfiguration.newBuilder(sql) - .setQuery(sql) + client.executeUpdate(session, QueryJobConfiguration.newBuilder(sql) .addPositionalParameter(QueryParameterValue.string(newComment.orElse(null))) .build()); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index 2cdf2f858566..5c80862ae7cc 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -30,8 +30,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.util.Objects.requireNonNull; public class BigQueryPageSourceProvider @@ -112,12 +110,11 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQueryTableHandle table, List columnHandles, Optional filter) { return new BigQueryQueryPageSource( + session, bigQueryClientFactory.create(session), table, columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()), columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()), - filter, - isQueryResultsCacheEnabled(session), - createDisposition(session)); + filter); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 11daeac84d56..125166465f7c 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -15,7 +15,6 @@ import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; -import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableList; @@ -27,6 +26,7 @@ import io.trino.spi.block.Block; import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ConnectorPageSource; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.type.ArrayType; import io.trino.spi.type.DecimalType; import io.trino.spi.type.Decimals; @@ -80,13 +80,12 @@ public class BigQueryQueryPageSource private boolean finished; public BigQueryQueryPageSource( + ConnectorSession session, BigQueryClient client, BigQueryTableHandle table, List columnNames, List columnTypes, - Optional filter, - boolean useQueryResultsCache, - CreateDisposition createDisposition) + Optional filter) { requireNonNull(client, "client is null"); requireNonNull(table, "table is null"); @@ -97,7 +96,7 @@ public BigQueryQueryPageSource( this.columnTypes = ImmutableList.copyOf(columnTypes); this.pageBuilder = new PageBuilder(columnTypes); String sql = buildSql(table, client.getProjectId(), ImmutableList.copyOf(columnNames), filter); - this.tableResult = client.query(sql, useQueryResultsCache, createDisposition); + this.tableResult = client.executeQuery(session, sql); } private static String buildSql(BigQueryTableHandle table, String projectId, List columnNames, Optional filter) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index 0c4803b8a316..7bc0d673682b 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -50,8 +50,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization; import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -160,7 +158,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl if (filter.isPresent()) { // count the rows based on the filter String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); + TableResult result = client.executeQuery(session, sql); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { @@ -170,7 +168,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl // (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections. if (TABLE_TYPES.contains(tableInfo.getDefinition().getType())) { String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); + TableResult result = client.executeQuery(session, sql); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java index 40121b4af216..34a9be05772f 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryConnectorTest.java @@ -16,8 +16,10 @@ import com.google.cloud.bigquery.TableDefinition; import io.airlift.units.Duration; import io.trino.Session; +import io.trino.spi.QueryId; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; +import io.trino.testing.MaterializedResultWithQueryId; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.sql.TestTable; import org.intellij.lang.annotations.Language; @@ -32,6 +34,7 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.Set; +import java.util.function.Function; import static com.google.cloud.bigquery.TableDefinition.Type.EXTERNAL; import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW; @@ -606,6 +609,50 @@ public void testBigQueryExternalTable() } } + @Test + public void testQueryLabeling() + { + Function sessionWithToken = token -> Session.builder(getSession()) + .setTraceToken(Optional.of(token)) + .build(); + + String materializedView = "test_query_label" + randomNameSuffix(); + try { + onBigQuery("CREATE MATERIALIZED VIEW test." + materializedView + " AS SELECT count(1) AS cnt FROM tpch.region"); + + @Language("SQL") + String query = "SELECT * FROM test." + materializedView; + + MaterializedResultWithQueryId result = getDistributedQueryRunner().executeWithQueryId(sessionWithToken.apply("first_token"), query); + assertLabelForTable(materializedView, result.getQueryId(), "first_token"); + + MaterializedResultWithQueryId result2 = getDistributedQueryRunner().executeWithQueryId(sessionWithToken.apply("second_token"), query); + assertLabelForTable(materializedView, result2.getQueryId(), "second_token"); + + assertThatThrownBy(() -> getDistributedQueryRunner().executeWithQueryId(sessionWithToken.apply("InvalidToken"), query)) + .hasMessageContaining("BigQuery label value can contain only lowercase letters, numeric characters, underscores, and dashes"); + } + finally { + onBigQuery("DROP MATERIALIZED VIEW IF EXISTS test." + materializedView); + } + } + + private void assertLabelForTable(String expectedView, QueryId queryId, String traceToken) + { + String expectedLabel = "q_" + queryId.toString() + "__t_" + traceToken; + + @Language("SQL") + String checkForLabelQuery = """ + SELECT * FROM region-us.INFORMATION_SCHEMA.JOBS_BY_USER WHERE EXISTS( + SELECT * FROM UNNEST(labels) AS label WHERE label.key = 'trino_query' AND label.value = '%s' + )""".formatted(expectedLabel); + + assertThat(bigQuerySqlExecutor.executeQuery(checkForLabelQuery).getValues()) + .extracting(values -> values.get("query").getStringValue()) + .singleElement() + .matches(statement -> statement.contains(expectedView)); + } + @Test public void testQueryCache() { diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java index e1e5cbd9b81b..bb4566271ab9 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryAvroConnectorTest.java @@ -46,7 +46,7 @@ protected QueryRunner createQueryRunner() { return BigQueryQueryRunner.createQueryRunner( ImmutableMap.of(), - ImmutableMap.of(), + ImmutableMap.of("bigquery.job.label-name", "trino_query", "bigquery.job.label-format", "q_$QUERY_ID__t_$TRACE_TOKEN"), REQUIRED_TPCH_TABLES); } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java index af1b38892234..5aabfecef005 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java @@ -49,6 +49,8 @@ public void testDefaults() .setViewsEnabled(false) .setArrowSerializationEnabled(false) .setQueryResultsCacheEnabled(false) + .setQueryLabelName(null) + .setQueryLabelFormat(null) .setRpcInitialChannelCount(1) .setMinRpcPerChannel(0) .setMaxRpcPerChannel(Integer.MAX_VALUE) @@ -75,6 +77,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .put("bigquery.service-cache-ttl", "10d") .put("bigquery.metadata.cache-ttl", "5d") .put("bigquery.query-results-cache.enabled", "true") + .put("bigquery.job.label-name", "trino_job_name") + .put("bigquery.job.label-format", "$TRACE_TOKEN") .put("bigquery.channel-pool.initial-size", "11") .put("bigquery.channel-pool.min-size", "12") .put("bigquery.channel-pool.max-size", "13") @@ -98,6 +102,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .setServiceCacheTtl(new Duration(10, DAYS)) .setMetadataCacheTtl(new Duration(5, DAYS)) .setQueryResultsCacheEnabled(true) + .setQueryLabelName("trino_job_name") + .setQueryLabelFormat("$TRACE_TOKEN") .setRpcInitialChannelCount(11) .setRpcMinChannelCount(12) .setRpcMaxChannelCount(13)