From ebe9ec2c132a7310d578296212ea837b01b9339a Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Mon, 20 Feb 2023 16:47:32 +0100 Subject: [PATCH] Label BigQuery jobs with Trino query id --- .../trino/plugin/bigquery/BigQueryClient.java | 28 ++++++++++++++----- .../bigquery/BigQueryClientFactory.java | 2 +- .../plugin/bigquery/BigQueryMetadata.java | 2 -- .../bigquery/BigQueryPageSourceProvider.java | 6 +--- .../bigquery/BigQueryQueryPageSource.java | 7 ++--- .../plugin/bigquery/BigQuerySplitManager.java | 6 ++-- 6 files changed, 27 insertions(+), 24 deletions(-) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index 58e4c0b063a..e195c0c378b 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -22,7 +22,6 @@ import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.JobStatistics.QueryStatistics; import com.google.cloud.bigquery.QueryJobConfiguration; @@ -35,11 +34,13 @@ import com.google.cloud.http.BaseHttpServiceException; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.spi.TrinoException; +import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.TableNotFoundException; @@ -65,6 +66,8 @@ import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.lang.String.format; import static java.util.Locale.ENGLISH; import static java.util.Objects.requireNonNull; @@ -75,6 +78,7 @@ public class BigQueryClient { private static final Logger log = Logger.get(BigQueryClient.class); + private final ConnectorSession session; private final BigQuery bigQuery; private final ViewMaterializationCache materializationCache; private final boolean caseInsensitiveNameMatching; @@ -82,12 +86,14 @@ public class BigQueryClient private final Optional configProjectId; public BigQueryClient( + ConnectorSession session, BigQuery bigQuery, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache, Duration metadataCacheTtl, Optional configProjectId) { + this.session = requireNonNull(session, "session is null"); this.bigQuery = requireNonNull(bigQuery, "bigQuery is null"); this.materializationCache = requireNonNull(materializationCache, "materializationCache is null"); this.caseInsensitiveNameMatching = caseInsensitiveNameMatching; @@ -266,7 +272,7 @@ public void executeUpdate(QueryJobConfiguration job) { log.debug("Execute query: %s", job.getQuery()); try { - bigQuery.query(job); + bigQuery.query(setQueryLabel(job)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -274,14 +280,15 @@ public void executeUpdate(QueryJobConfiguration job) } } - public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition) + public TableResult query(String sql) { log.debug("Execute query: %s", sql); + QueryJobConfiguration job = QueryJobConfiguration.newBuilder(sql) + .setUseQueryCache(isQueryResultsCacheEnabled(session)) + .setCreateDisposition(createDisposition(session)) + .build(); try { - return bigQuery.query(QueryJobConfiguration.newBuilder(sql) - .setUseQueryCache(useQueryResultsCache) - .setCreateDisposition(createDisposition) - .build()); + return bigQuery.query(setQueryLabel(job)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -289,6 +296,13 @@ public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposi } } + private QueryJobConfiguration setQueryLabel(QueryJobConfiguration job) + { + return job.toBuilder() + .setLabels(ImmutableMap.of("trino_query_id", session.getQueryId())) + .build(); + } + public Schema getSchema(String sql) { log.debug("Get schema from query: %s", sql); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java index bc1a4d0280c..8e1436367f1 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClientFactory.java @@ -77,7 +77,7 @@ public BigQueryClient create(ConnectorSession session) protected BigQueryClient createBigQueryClient(ConnectorSession session) { - return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId); + return new BigQueryClient(session, createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl, projectId); } protected BigQuery createBigQuery(ConnectorSession session) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java index 78250e3cf15..f7acafda959 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryMetadata.java @@ -643,7 +643,6 @@ public void setTableComment(ConnectorSession session, ConnectorTableHandle table quote(remoteTableName.getDatasetName()), quote(remoteTableName.getTableName())); client.executeUpdate(QueryJobConfiguration.newBuilder(sql) - .setQuery(sql) .addPositionalParameter(QueryParameterValue.string(newComment.orElse(null))) .build()); } @@ -663,7 +662,6 @@ public void setColumnComment(ConnectorSession session, ConnectorTableHandle tabl quote(remoteTableName.getTableName()), quote(column.getName())); client.executeUpdate(QueryJobConfiguration.newBuilder(sql) - .setQuery(sql) .addPositionalParameter(QueryParameterValue.string(newComment.orElse(null))) .build()); } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index 2cdf2f85856..42fdf139bf8 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -30,8 +30,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.util.Objects.requireNonNull; public class BigQueryPageSourceProvider @@ -116,8 +114,6 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ table, columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()), columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()), - filter, - isQueryResultsCacheEnabled(session), - createDisposition(session)); + filter); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 11daeac84d5..658698dbbb9 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -15,7 +15,6 @@ import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; -import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableList; @@ -84,9 +83,7 @@ public BigQueryQueryPageSource( BigQueryTableHandle table, List columnNames, List columnTypes, - Optional filter, - boolean useQueryResultsCache, - CreateDisposition createDisposition) + Optional filter) { requireNonNull(client, "client is null"); requireNonNull(table, "table is null"); @@ -97,7 +94,7 @@ public BigQueryQueryPageSource( this.columnTypes = ImmutableList.copyOf(columnTypes); this.pageBuilder = new PageBuilder(columnTypes); String sql = buildSql(table, client.getProjectId(), ImmutableList.copyOf(columnNames), filter); - this.tableResult = client.query(sql, useQueryResultsCache, createDisposition); + this.tableResult = client.query(sql); } private static String buildSql(BigQueryTableHandle table, String projectId, List columnNames, Optional filter) diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index c8a369581c9..a99ec763715 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -50,8 +50,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; -import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization; import static io.trino.plugin.bigquery.BigQueryUtil.isWildcardTable; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; @@ -160,7 +158,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl if (filter.isPresent()) { // count the rows based on the filter String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); + TableResult result = client.query(sql); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { @@ -170,7 +168,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl // (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections. if (tableInfo.getDefinition().getType() == TABLE || tableInfo.getDefinition().getType() == VIEW) { String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); + TableResult result = client.query(sql); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else {