From a958faae09d80f9baac5c96ac20bf2bb6dfdbcf4 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 16 May 2022 16:51:18 +0900 Subject: [PATCH] Support query cache option in BigQuery --- docs/src/main/sphinx/connector/bigquery.rst | 2 ++ .../trino/plugin/bigquery/BigQueryClient.java | 15 +++++++--- .../trino/plugin/bigquery/BigQueryConfig.java | 13 ++++++++ .../bigquery/BigQueryPageSourceProvider.java | 6 +++- .../bigquery/BigQueryQueryPageSource.java | 7 +++-- .../bigquery/BigQuerySessionProperties.java | 25 ++++++++++++++++ .../plugin/bigquery/BigQuerySplitManager.java | 6 ++-- .../plugin/bigquery/TestBigQueryConfig.java | 7 +++-- .../bigquery/TestBigQueryConnectorTest.java | 30 +++++++++++++++++++ 9 files changed, 100 insertions(+), 11 deletions(-) diff --git a/docs/src/main/sphinx/connector/bigquery.rst b/docs/src/main/sphinx/connector/bigquery.rst index afc43bf9e228..e2de905798b5 100644 --- a/docs/src/main/sphinx/connector/bigquery.rst +++ b/docs/src/main/sphinx/connector/bigquery.rst @@ -137,6 +137,8 @@ Property Description ``bigquery.credentials-key`` The base64 encoded credentials key None. See the `requirements <#requirements>`_ section. ``bigquery.credentials-file`` The path to the JSON credentials file None. See the `requirements <#requirements>`_ section. ``bigquery.case-insensitive-name-matching`` Match dataset and table names case-insensitively ``false`` +``bigquery.query-results-cache.enabled`` Enable `query results cache + `_ ``false`` ===================================================== ============================================================== ====================================================== Data types diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java index f8f126e2db0b..c9cefe32548f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryClient.java @@ -20,6 +20,7 @@ import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; @@ -208,10 +209,13 @@ Job create(JobInfo jobInfo) return bigQuery.create(jobInfo); } - public TableResult query(String sql) + public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition) { try { - return bigQuery.query(QueryJobConfiguration.of(sql)); + return bigQuery.query(QueryJobConfiguration.newBuilder(sql) + .setUseQueryCache(useQueryResultsCache) + .setCreateDisposition(createDisposition) + .build()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -219,12 +223,15 @@ public TableResult query(String sql) } } - public TableResult query(TableId table, List requiredColumns, Optional filter) + public TableResult query(TableId table, List requiredColumns, Optional filter, boolean useQueryResultsCache, CreateDisposition createDisposition) { String sql = selectSql(table, requiredColumns, filter); log.debug("Execute query: %s", sql); try { - return bigQuery.query(QueryJobConfiguration.of(sql)); + return bigQuery.query(QueryJobConfiguration.newBuilder(sql) + .setUseQueryCache(useQueryResultsCache) + .setCreateDisposition(createDisposition) + .build()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java index 44ffe49b4e9f..934e2e2fc962 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryConfig.java @@ -48,6 +48,7 @@ public class BigQueryConfig private boolean caseInsensitiveNameMatching; private Duration viewsCacheTtl = new Duration(15, MINUTES); private Duration serviceCacheTtl = new Duration(3, MINUTES); + private boolean queryResultsCacheEnabled; public Optional getProjectId() { @@ -212,6 +213,18 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl) return this; } + public boolean isQueryResultsCacheEnabled() + { + return queryResultsCacheEnabled; + } + + @Config("bigquery.query-results-cache.enabled") + public BigQueryConfig setQueryResultsCacheEnabled(boolean queryResultsCacheEnabled) + { + this.queryResultsCacheEnabled = queryResultsCacheEnabled; + return this; + } + @PostConstruct public void validate() { diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java index eb17b3575785..4db8118f7889 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSourceProvider.java @@ -30,6 +30,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static java.util.Objects.requireNonNull; public class BigQueryPageSourceProvider @@ -104,6 +106,8 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ table, columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()), columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()), - filter); + filter, + isQueryResultsCacheEnabled(session), + createDisposition(session)); } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java index 9574ac2f6c13..2becc818854e 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryQueryPageSource.java @@ -15,6 +15,7 @@ import com.google.cloud.bigquery.FieldValue; import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; import com.google.common.collect.ImmutableList; @@ -83,7 +84,9 @@ public BigQueryQueryPageSource( BigQueryTableHandle table, List columnNames, List columnTypes, - Optional filter) + Optional filter, + boolean useQueryResultsCache, + CreateDisposition createDisposition) { requireNonNull(client, "client is null"); requireNonNull(table, "table is null"); @@ -94,7 +97,7 @@ public BigQueryQueryPageSource( this.columnTypes = ImmutableList.copyOf(columnTypes); this.pageBuilder = new PageBuilder(columnTypes); TableId tableId = TableId.of(client.getProjectId(), table.getRemoteTableName().getDatasetName(), table.getRemoteTableName().getTableName()); - this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter); + this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter, useQueryResultsCache, createDisposition); } @Override diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java index a76f14cbea17..90f17f30d562 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySessionProperties.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.bigquery; +import com.google.cloud.bigquery.JobInfo.CreateDisposition; import com.google.common.collect.ImmutableList; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.spi.connector.ConnectorSession; @@ -23,11 +24,14 @@ import java.util.List; import static io.trino.spi.session.PropertyMetadata.booleanProperty; +import static io.trino.spi.session.PropertyMetadata.enumProperty; public final class BigQuerySessionProperties implements SessionPropertiesProvider { private static final String SKIP_VIEW_MATERIALIZATION = "skip_view_materialization"; + private static final String QUERY_RESULTS_CACHE_ENABLED = "query_results_cache_enabled"; + private static final String CREATE_DISPOSITION_TYPE = "create_disposition_type"; private final List> sessionProperties; @@ -40,6 +44,17 @@ public BigQuerySessionProperties(BigQueryConfig config) "Skip materializing views", config.isSkipViewMaterialization(), false)) + .add(booleanProperty( + QUERY_RESULTS_CACHE_ENABLED, + "Enable query results cache", + config.isQueryResultsCacheEnabled(), + false)) + .add(enumProperty( + CREATE_DISPOSITION_TYPE, + "Create disposition type", + CreateDisposition.class, + CreateDisposition.CREATE_IF_NEEDED, // https://cloud.google.com/bigquery/docs/cached-results + true)) .build(); } @@ -53,4 +68,14 @@ public static boolean isSkipViewMaterialization(ConnectorSession session) { return session.getProperty(SKIP_VIEW_MATERIALIZATION, Boolean.class); } + + public static boolean isQueryResultsCacheEnabled(ConnectorSession session) + { + return session.getProperty(QUERY_RESULTS_CACHE_ENABLED, Boolean.class); + } + + public static CreateDisposition createDisposition(ConnectorSession session) + { + return session.getProperty(CREATE_DISPOSITION_TYPE, CreateDisposition.class); + } } diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java index 35485ca3aecb..a479fe66b3ae 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQuerySplitManager.java @@ -46,6 +46,8 @@ import static com.google.cloud.bigquery.TableDefinition.Type.VIEW; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition; +import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled; import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.util.Objects.requireNonNull; @@ -139,7 +141,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl if (filter.isPresent()) { // count the rows based on the filter String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql); + TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { @@ -151,7 +153,7 @@ private List createEmptyProjection(ConnectorSession session, Tabl } else if (tableInfo.getDefinition().getType() == VIEW) { String sql = client.selectSql(remoteTableId, "COUNT(*)"); - TableResult result = client.query(sql); + TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session)); numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue(); } else { diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java index 432449d23d66..d7a3f04267ec 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConfig.java @@ -44,7 +44,8 @@ public void testDefaults() .setCaseInsensitiveNameMatching(false) .setViewsCacheTtl(new Duration(15, MINUTES)) .setServiceCacheTtl(new Duration(3, MINUTES)) - .setViewsEnabled(false)); + .setViewsEnabled(false) + .setQueryResultsCacheEnabled(false)); } @Test @@ -63,6 +64,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .put("bigquery.case-insensitive-name-matching", "true") .put("bigquery.views-cache-ttl", "1m") .put("bigquery.service-cache-ttl", "10d") + .put("bigquery.query-results-cache.enabled", "true") .buildOrThrow(); BigQueryConfig expected = new BigQueryConfig() @@ -77,7 +79,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey() .setMaxReadRowsRetries(10) .setCaseInsensitiveNameMatching(true) .setViewsCacheTtl(new Duration(1, MINUTES)) - .setServiceCacheTtl(new Duration(10, DAYS)); + .setServiceCacheTtl(new Duration(10, DAYS)) + .setQueryResultsCacheEnabled(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConnectorTest.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConnectorTest.java index 274296268e37..8f15601f37b3 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConnectorTest.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/TestBigQueryConnectorTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import io.trino.Session; import io.trino.testing.BaseConnectorTest; import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; @@ -621,6 +622,35 @@ public void testBigQuerySnapshotTable() } } + @Test + public void testQueryCache() + { + Session queryResultsCacheSession = Session.builder(getSession()) + .setCatalogSessionProperty("bigquery", "query_results_cache_enabled", "true") + .build(); + Session createNeverDisposition = Session.builder(getSession()) + .setCatalogSessionProperty("bigquery", "query_results_cache_enabled", "true") + .setCatalogSessionProperty("bigquery", "create_disposition_type", "create_never") + .build(); + + String materializedView = "test_materialized_view" + randomTableSuffix(); + try { + onBigQuery("CREATE MATERIALIZED VIEW test." + materializedView + " AS SELECT count(1) AS cnt FROM tpch.region"); + + // Verify query cache is empty + assertThatThrownBy(() -> query(createNeverDisposition, "SELECT * FROM test." + materializedView)) + .hasMessageContaining("Not found"); + // Populate cache and verify it + assertQuery(queryResultsCacheSession, "SELECT * FROM test." + materializedView, "VALUES 5"); + assertQuery(createNeverDisposition, "SELECT * FROM test." + materializedView, "VALUES 5"); + + assertUpdate("DROP TABLE test." + materializedView); + } + finally { + onBigQuery("DROP MATERIALIZED VIEW IF EXISTS test." + materializedView); + } + } + private void onBigQuery(@Language("SQL") String sql) { bigQuerySqlExecutor.execute(sql);