Skip to content

Commit

Permalink
Add dataset cache to BigQuery connector
Browse files Browse the repository at this point in the history
When caseInsensitiveNameMatching=true, the BigQueryClient
makes a call to listDatasets for each call on toRemoteDataset.
This is problematic when running queries like SHOW SCHEMAS
since listDatasets is called once to get the initial list,
then once again for each schema. This is expensive if the
BigQuery project has a large number of datasets.

This commit adds a cache to listing datasets to drastically
reduce the query time for SHOW SCHEMAS.
  • Loading branch information
adamjshook committed Nov 2, 2022
1 parent c7e0ecc commit b69938f
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 3 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ Property Description
``BIGNUMERIC`` and ``TIMESTAMP`` types are unsupported. ``false``
``bigquery.views-cache-ttl`` Duration for which the materialization of a view will be ``15m``
cached and reused. Set to ``0ms`` to disable the cache.
``bigquery.metadata.cache-ttl`` Duration for which metadata retrieved from BigQuery ``0ms``
is cached and reused. Set to ``0ms`` to disable the cache.
``bigquery.max-read-rows-retries`` The number of retries in case of retryable server issues ``3``
``bigquery.credentials-key`` The base64 encoded credentials key None. See the `requirements <#requirements>`_ section.
``bigquery.credentials-file`` The path to the JSON credentials file None. See the `requirements <#requirements>`_ section.
Expand Down
2 changes: 2 additions & 0 deletions plugin/trino-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@
<excludes>
<!-- If you are adding entry here also add an entry to cloud-tests or cloud-tests-case-insensitive-mapping profile below -->
<exclude>**/TestBigQueryConnectorTest.java</exclude>
<exclude>**/TestBigQueryMetadataCaching.java</exclude>
<exclude>**/TestBigQueryTypeMapping.java</exclude>
<exclude>**/TestBigQueryMetadata.java</exclude>
<exclude>**/TestBigQueryInstanceCleaner.java</exclude>
Expand Down Expand Up @@ -415,6 +416,7 @@
<configuration>
<includes>
<include>**/TestBigQueryConnectorTest.java</include>
<include>**/TestBigQueryMetadataCaching.java</include>
<include>**/TestBigQueryTypeMapping.java</include>
<include>**/TestBigQueryMetadata.java</include>
<include>**/TestBigQueryInstanceCleaner.java</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.collect.cache.EvictableCacheBuilder;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.TableNotFoundException;

Expand All @@ -45,6 +48,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;

import static com.google.cloud.bigquery.JobStatistics.QueryStatistics.StatementType.SELECT;
Expand All @@ -58,9 +62,11 @@
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_AMBIGUOUS_OBJECT_NAME;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_INVALID_STATEMENT;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.joining;

public class BigQueryClient
Expand All @@ -70,12 +76,17 @@ public class BigQueryClient
private final BigQuery bigQuery;
private final ViewMaterializationCache materializationCache;
private final boolean caseInsensitiveNameMatching;
private final LoadingCache<String, List<Dataset>> remoteDatasetCache;

public BigQueryClient(BigQuery bigQuery, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache)
public BigQueryClient(BigQuery bigQuery, boolean caseInsensitiveNameMatching, ViewMaterializationCache materializationCache, Duration metadataCacheTtl)
{
this.bigQuery = requireNonNull(bigQuery, "bigQuery is null");
this.materializationCache = requireNonNull(materializationCache, "materializationCache is null");
this.caseInsensitiveNameMatching = caseInsensitiveNameMatching;
this.remoteDatasetCache = EvictableCacheBuilder.newBuilder()
.expireAfterWrite(metadataCacheTtl.toMillis(), MILLISECONDS)
.shareNothingWhenDisabled()
.build(CacheLoader.from(this::listDatasetsFromBigQuery));
}

public Optional<RemoteDatabaseObject> toRemoteDataset(String projectId, String datasetName)
Expand Down Expand Up @@ -175,7 +186,18 @@ public String getProjectId()

public Iterable<Dataset> listDatasets(String projectId)
{
return bigQuery.listDatasets(projectId).iterateAll();
try {
return remoteDatasetCache.get(projectId);
}
catch (ExecutionException e) {
throw new TrinoException(BIGQUERY_LISTING_DATASET_ERROR, "Failed to retrieve datasets from BigQuery", e);
}
}

private List<Dataset> listDatasetsFromBigQuery(String projectId)
{
return stream(bigQuery.listDatasets(projectId).iterateAll())
.collect(toImmutableList());
}

public Iterable<Table> listTables(DatasetId remoteDatasetId, TableDefinition.Type... types)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.common.cache.CacheBuilder;
import io.airlift.units.Duration;
import io.trino.collect.cache.NonEvictableCache;
import io.trino.spi.connector.ConnectorSession;

Expand All @@ -40,6 +41,7 @@ public class BigQueryClientFactory
private final ViewMaterializationCache materializationCache;
private final HeaderProvider headerProvider;
private final NonEvictableCache<IdentityCacheMapping.IdentityCacheKey, BigQueryClient> clientCache;
private final Duration metadataCacheTtl;

@Inject
public BigQueryClientFactory(
Expand All @@ -56,6 +58,7 @@ public BigQueryClientFactory(
this.caseInsensitiveNameMatching = bigQueryConfig.isCaseInsensitiveNameMatching();
this.materializationCache = requireNonNull(materializationCache, "materializationCache is null");
this.headerProvider = requireNonNull(headerProvider, "headerProvider is null");
this.metadataCacheTtl = bigQueryConfig.getMetadataCacheTtl();

CacheBuilder<Object, Object> cacheBuilder = CacheBuilder.newBuilder()
.expireAfterWrite(bigQueryConfig.getServiceCacheTtl().toMillis(), MILLISECONDS);
Expand All @@ -72,7 +75,7 @@ public BigQueryClient create(ConnectorSession session)

protected BigQueryClient createBigQueryClient(ConnectorSession session)
{
return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache);
return new BigQueryClient(createBigQuery(session), caseInsensitiveNameMatching, materializationCache, metadataCacheTtl);
}

protected BigQuery createBigQuery(ConnectorSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;

@DefunctConfig("bigquery.case-insensitive-name-matching.cache-ttl")
Expand All @@ -51,6 +52,7 @@ public class BigQueryConfig
private boolean caseInsensitiveNameMatching;
private Duration viewsCacheTtl = new Duration(15, MINUTES);
private Duration serviceCacheTtl = new Duration(3, MINUTES);
private Duration metadataCacheTtl = new Duration(0, MILLISECONDS);
private boolean queryResultsCacheEnabled;

private int rpcInitialChannelCount = 1;
Expand Down Expand Up @@ -222,6 +224,21 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl)
return this;
}

@NotNull
@MinDuration("0ms")
public Duration getMetadataCacheTtl()
{
return metadataCacheTtl;
}

@Config("bigquery.metadata.cache-ttl")
@ConfigDescription("Duration for which BigQuery client metadata is cached after listing")
public BigQueryConfig setMetadataCacheTtl(Duration metadataCacheTtl)
{
this.metadataCacheTtl = metadataCacheTtl;
return this;
}

public boolean isQueryResultsCacheEnabled()
{
return queryResultsCacheEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -44,6 +45,7 @@ public void testDefaults()
.setCaseInsensitiveNameMatching(false)
.setViewsCacheTtl(new Duration(15, MINUTES))
.setServiceCacheTtl(new Duration(3, MINUTES))
.setMetadataCacheTtl(new Duration(0, MILLISECONDS))
.setViewsEnabled(false)
.setQueryResultsCacheEnabled(false)
.setRpcInitialChannelCount(1)
Expand All @@ -69,6 +71,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.put("bigquery.case-insensitive-name-matching", "true")
.put("bigquery.views-cache-ttl", "1m")
.put("bigquery.service-cache-ttl", "10d")
.put("bigquery.metadata.cache-ttl", "5d")
.put("bigquery.query-results-cache.enabled", "true")
.put("bigquery.channel-pool.initial-size", "11")
.put("bigquery.channel-pool.min-size", "12")
Expand All @@ -90,6 +93,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.setCaseInsensitiveNameMatching(true)
.setViewsCacheTtl(new Duration(1, MINUTES))
.setServiceCacheTtl(new Duration(10, DAYS))
.setMetadataCacheTtl(new Duration(5, DAYS))
.setQueryResultsCacheEnabled(true)
.setRpcInitialChannelCount(11)
.setRpcMinChannelCount(12)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.google.common.collect.ImmutableMap;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.testng.annotations.Test;

import static io.trino.plugin.bigquery.BigQueryQueryRunner.BigQuerySqlExecutor;
import static io.trino.testing.sql.TestTable.randomTableSuffix;
import static org.testng.Assert.assertEquals;

public class TestBigQueryMetadataCaching
extends AbstractTestQueryFramework
{
protected BigQuerySqlExecutor bigQuerySqlExecutor;

@Override
protected QueryRunner createQueryRunner()
throws Exception
{
this.bigQuerySqlExecutor = new BigQuerySqlExecutor();
return BigQueryQueryRunner.createQueryRunner(
ImmutableMap.of(),
ImmutableMap.of("bigquery.metadata.cache-ttl", "5m"));
}

@Test
public void testMetadataCaching()
{
String schema = "test_metadata_caching_" + randomTableSuffix();
try {
getQueryRunner().execute("CREATE SCHEMA " + schema);
assertEquals(getQueryRunner().execute("SHOW SCHEMAS IN bigquery LIKE '" + schema + "'").getOnlyValue(), schema);

String schemaTableName = schema + ".test_metadata_caching";
getQueryRunner().execute("CREATE TABLE " + schemaTableName + " AS SELECT * FROM tpch.tiny.region");
assertEquals(getQueryRunner().execute("SELECT * FROM " + schemaTableName).getRowCount(), 5);

bigQuerySqlExecutor.execute("DROP SCHEMA " + schema + " CASCADE");
assertEquals(getQueryRunner().execute("SHOW SCHEMAS IN bigquery LIKE '" + schema + "'").getOnlyValue(), schema);

assertQueryFails("SELECT * FROM " + schemaTableName, ".*Schema '.+' does not exist.*");
}
finally {
bigQuerySqlExecutor.execute("DROP SCHEMA IF EXISTS " + schema + " CASCADE");
}
}
}

0 comments on commit b69938f

Please sign in to comment.