Skip to content

Commit

Permalink
Label BigQuery jobs with Trino query id
Browse files Browse the repository at this point in the history
ConnectorSession needs to be passed to query/update methods because BigQueryClient is cached using
identityCacheMapping.getRemoteUserCacheKey() which is not taking into account session properties.

We need also to access queryId in order to properly label queries but we don't want to cache client per query id.
  • Loading branch information
wendigo committed May 15, 2023
1 parent f122fbe commit 56af278
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.JobStatistics.QueryStatistics;
import com.google.cloud.bigquery.QueryJobConfiguration;
Expand All @@ -40,6 +39,7 @@
import io.airlift.units.Duration;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;

Expand Down Expand Up @@ -68,6 +68,8 @@
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand All @@ -81,19 +83,22 @@ public class BigQueryClient
static final Set<TableDefinition.Type> TABLE_TYPES = ImmutableSet.of(TABLE, VIEW, MATERIALIZED_VIEW, EXTERNAL, SNAPSHOT);

private final BigQuery bigQuery;
private final BigQueryLabelFactory labelFactory;
private final ViewMaterializationCache materializationCache;
private final boolean caseInsensitiveNameMatching;
private final LoadingCache<String, List<Dataset>> remoteDatasetCache;
private final Optional<String> configProjectId;

public BigQueryClient(
BigQuery bigQuery,
BigQueryLabelFactory labelFactory,
boolean caseInsensitiveNameMatching,
ViewMaterializationCache materializationCache,
Duration metadataCacheTtl,
Optional<String> configProjectId)
{
this.bigQuery = requireNonNull(bigQuery, "bigQuery is null");
this.labelFactory = requireNonNull(labelFactory, "labelFactory is null");
this.materializationCache = requireNonNull(materializationCache, "materializationCache is null");
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
this.remoteDatasetCache = EvictableCacheBuilder.newBuilder()
Expand Down Expand Up @@ -266,30 +271,33 @@ Job create(JobInfo jobInfo)
return bigQuery.create(jobInfo);
}

public void executeUpdate(QueryJobConfiguration job)
public void executeUpdate(ConnectorSession session, QueryJobConfiguration job)
{
log.debug("Execute query: %s", job.getQuery());
try {
bigQuery.query(job);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", job.getQuery()), e);
}
execute(session, job);
}

public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition)
public TableResult executeQuery(ConnectorSession session, String sql)
{
log.debug("Execute query: %s", sql);
QueryJobConfiguration job = QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(isQueryResultsCacheEnabled(session))
.setCreateDisposition(createDisposition(session))
.build();
return execute(session, job);
}

private TableResult execute(ConnectorSession session, QueryJobConfiguration job)
{
QueryJobConfiguration jobWithQueryLabel = job.toBuilder()
.setLabels(labelFactory.getLabels(session))
.build();
try {
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryResultsCache)
.setCreateDisposition(createDisposition)
.build());
return bigQuery.query(jobWithQueryLabel);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e);
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", job.getQuery()), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class BigQueryClientFactory
private final boolean caseInsensitiveNameMatching;
private final ViewMaterializationCache materializationCache;
private final HeaderProvider headerProvider;
private final BigQueryLabelFactory labelFactory;
private final NonEvictableCache<IdentityCacheMapping.IdentityCacheKey, BigQueryClient> clientCache;
private final Duration metadataCacheTtl;

Expand All @@ -50,7 +51,8 @@ public BigQueryClientFactory(
BigQueryCredentialsSupplier credentialsSupplier,
BigQueryConfig bigQueryConfig,
ViewMaterializationCache materializationCache,
HeaderProvider headerProvider)
HeaderProvider headerProvider,
BigQueryLabelFactory labelFactory)
{
this.identityCacheMapping = requireNonNull(identityCacheMapping, "identityCacheMapping is null");
this.credentialsSupplier = requireNonNull(credentialsSupplier, "credentialsSupplier is null");
Expand All @@ -60,6 +62,7 @@ public BigQueryClientFactory(
this.caseInsensitiveNameMatching = bigQueryConfig.isCaseInsensitiveNameMatching();
this.materializationCache = requireNonNull(materializationCache, "materializationCache is null");
this.headerProvider = requireNonNull(headerProvider, "headerProvider is null");
this.labelFactory = requireNonNull(labelFactory, "labelFactory is null");
this.metadataCacheTtl = bigQueryConfig.getMetadataCacheTtl();

CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder()
Expand All @@ -77,7 +80,7 @@ public BigQueryClient create(ConnectorSession session)

protected BigQueryClient createBigQueryClient(ConnectorSession session)
{
return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId);
return new BigQueryClient(createBigQuery(session), labelFactory, caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId);
}

protected BigQuery createBigQuery(ConnectorSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@
import io.airlift.configuration.DefunctConfig;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;
import io.trino.plugin.base.logging.SessionInterpolatedValues;

import javax.annotation.PostConstruct;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.Optional;

import static com.google.common.base.Preconditions.checkState;
import static io.trino.plugin.base.logging.FormatInterpolator.hasValidPlaceholders;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
Expand Down Expand Up @@ -57,6 +60,9 @@ public class BigQueryConfig
private Duration metadataCacheTtl = new Duration(0, MILLISECONDS);
private boolean queryResultsCacheEnabled;

private String queryLabelName;
private String queryLabelFormat;

private int rpcInitialChannelCount = 1;
private int rpcMinChannelCount = 1;
private int rpcMaxChannelCount = 1;
Expand Down Expand Up @@ -266,6 +272,38 @@ public BigQueryConfig setQueryResultsCacheEnabled(boolean queryResultsCacheEnabl
return this;
}

public String getQueryLabelFormat()
{
return queryLabelFormat;
}

@Config("bigquery.job.label-format")
@ConfigDescription("Adds `bigquery.job.label-name` label to the BigQuery job with provided value format")
public BigQueryConfig setQueryLabelFormat(String queryLabelFormat)
{
this.queryLabelFormat = queryLabelFormat;
return this;
}

@AssertTrue(message = "Incorrect bigquery.job.label-format may consist of only letters, digits, underscores, commas, spaces, equal signs and predefined values")
boolean isQueryLabelFormatValid()
{
return queryLabelFormat == null || hasValidPlaceholders(queryLabelFormat, SessionInterpolatedValues.values());
}

public String getQueryLabelName()
{
return queryLabelName;
}

@Config("bigquery.job.label-name")
@ConfigDescription("Adds label with the given name to the BigQuery job")
public BigQueryConfig setQueryLabelName(String queryLabelName)
{
this.queryLabelName = queryLabelName;
return this;
}

@Min(1)
@Max(MAX_RPC_CONNECTIONS)
public int getRpcInitialChannelCount()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.logging.FormatInterpolator;
import io.trino.plugin.base.logging.SessionInterpolatedValues;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.plugin.bigquery.ptf.Query;
import io.trino.spi.NodeManager;
Expand Down Expand Up @@ -81,6 +83,13 @@ public static HeaderProvider createHeaderProvider(NodeManager nodeManager)
return FixedHeaderProvider.create("user-agent", "Trino/" + nodeManager.getCurrentNode().getVersion());
}

@Provides
@Singleton
public static BigQueryLabelFactory labelFactory(BigQueryConfig config)
{
return new BigQueryLabelFactory(config.getQueryLabelName(), new FormatInterpolator<>(config.getQueryLabelFormat(), SessionInterpolatedValues.values()));
}

/**
* Apache Arrow requires reflective access to certain Java internals prohibited since Java 17.
* Adds an error to the {@code binder} if required --add-opens is not passed to the JVM.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.google.common.base.CharMatcher;
import io.trino.plugin.base.logging.FormatInterpolator;
import io.trino.spi.connector.ConnectorSession;

import java.util.Map;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;

public class BigQueryLabelFactory
{
private static final CharMatcher ALLOWED_CHARS = CharMatcher.inRange('a', 'z')
.or(CharMatcher.inRange('0', '9'))
.or(CharMatcher.anyOf("_-"))
.precomputed();

private static final int MAX_LABEL_VALUE_LENGTH = 63;
private final String name;
private final FormatInterpolator<ConnectorSession> interpolator;

public BigQueryLabelFactory(String labelName, FormatInterpolator<ConnectorSession> interpolator)
{
this.name = labelName;
this.interpolator = requireNonNull(interpolator, "interpolator is null");
}

public Map<String, String> getLabels(ConnectorSession session)
{
if (isNullOrEmpty(name)) {
return Map.of();
}

String value = interpolator.interpolate(session).trim();
if (isNullOrEmpty(value)) {
return Map.of();
}

verifyLabelValue(name);
verifyLabelValue(value);
return Map.of(name, value);
}

private void verifyLabelValue(String value)
{
verify(value.length() <= MAX_LABEL_VALUE_LENGTH, "BigQuery label value cannot be longer than %s characters", MAX_LABEL_VALUE_LENGTH);
verify(ALLOWED_CHARS.matchesAllOf(value), "BigQuery label value can contain only lowercase letters, numeric characters, underscores, and dashes");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHa
quote(remoteTableName.getProjectId()),
quote(remoteTableName.getDatasetName()),
quote(remoteTableName.getTableName()));
client.executeUpdate(QueryJobConfiguration.of(sql));
client.executeUpdate(session, QueryJobConfiguration.of(sql));
}

@Override
Expand Down Expand Up @@ -603,7 +603,7 @@ private Optional<ConnectorOutputMetadata> finishInsert(
quote(pageSinkIdColumnName),
quote(pageSinkIdColumnName));

client.executeUpdate(QueryJobConfiguration.of(insertSql));
client.executeUpdate(session, QueryJobConfiguration.of(insertSql));
}
finally {
try {
Expand Down Expand Up @@ -636,8 +636,7 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table
quote(remoteTableName.getProjectId()),
quote(remoteTableName.getDatasetName()),
quote(remoteTableName.getTableName()));
client.executeUpdate(QueryJobConfiguration.newBuilder(sql)
.setQuery(sql)
client.executeUpdate(session, QueryJobConfiguration.newBuilder(sql)
.addPositionalParameter(QueryParameterValue.string(newComment.orElse(null)))
.build());
}
Expand All @@ -656,8 +655,7 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl
quote(remoteTableName.getDatasetName()),
quote(remoteTableName.getTableName()),
quote(column.getName()));
client.executeUpdate(QueryJobConfiguration.newBuilder(sql)
.setQuery(sql)
client.executeUpdate(session, QueryJobConfiguration.newBuilder(sql)
.addPositionalParameter(QueryParameterValue.string(newComment.orElse(null)))
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled;
import static java.util.Objects.requireNonNull;

public class BigQueryPageSourceProvider
Expand Down Expand Up @@ -112,12 +110,11 @@ private ConnectorPageSource createStoragePageSource(ConnectorSession session, Bi
private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQueryTableHandle table, List<BigQueryColumnHandle> columnHandles, Optional<String> filter)
{
return new BigQueryQueryPageSource(
session,
bigQueryClientFactory.create(session),
table,
columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()),
columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()),
filter,
isQueryResultsCacheEnabled(session),
createDisposition(session));
filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableList;
Expand All @@ -27,6 +26,7 @@
import io.trino.spi.block.Block;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.DecimalType;
import io.trino.spi.type.Decimals;
Expand Down Expand Up @@ -80,13 +80,12 @@ public class BigQueryQueryPageSource
private boolean finished;

public BigQueryQueryPageSource(
ConnectorSession session,
BigQueryClient client,
BigQueryTableHandle table,
List<String> columnNames,
List<Type> columnTypes,
Optional<String> filter,
boolean useQueryResultsCache,
CreateDisposition createDisposition)
Optional<String> filter)
{
requireNonNull(client, "client is null");
requireNonNull(table, "table is null");
Expand All @@ -97,7 +96,7 @@ public BigQueryQueryPageSource(
this.columnTypes = ImmutableList.copyOf(columnTypes);
this.pageBuilder = new PageBuilder(columnTypes);
String sql = buildSql(table, client.getProjectId(), ImmutableList.copyOf(columnNames), filter);
this.tableResult = client.query(sql, useQueryResultsCache, createDisposition);
this.tableResult = client.executeQuery(session, sql);
}

private static String buildSql(BigQueryTableHandle table, String projectId, List<String> columnNames, Optional<String> filter)
Expand Down
Loading

0 comments on commit 56af278

Please sign in to comment.