From 56daff86f800f2ac714abce6d96faaac3d20863b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 13:10:00 +0100 Subject: [PATCH 1/9] Remove redundant asyncReloading CacheLoader wrapper The changed caches do not set `cacheBuilder.refreshAfterWrite` (since 3a1bf59d7ca5008419d3efacf00fe1e99fa1d91c), so the `asyncReloading`, which changes `CacheLoader.reload`, and delegates the reset), was redundant. --- .../plugin/hive/metastore/cache/CachingHiveMetastore.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index 528cf56adaef..08f0bb341226 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -184,7 +184,7 @@ protected CachingHiveMetastore(HiveMetastore delegate, Executor executor, Option // disable refresh since it can't use the bulk loading and causes too many requests partitionStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, OptionalLong.empty(), maximumSize, statsRecording) - .build(asyncReloading(new CacheLoader<>() + .build(new CacheLoader<>() { @Override public PartitionStatistics load(WithIdentity key) @@ -197,7 +197,7 @@ public Map, PartitionStatistics> loadAll(Iterabl { return loadPartitionColumnStatistics(keys); } - }, executor)); + }); tableCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) .build(asyncReloading(CacheLoader.from(this::loadTable), executor)); @@ -210,7 +210,7 @@ public Map, PartitionStatistics> loadAll(Iterabl // disable refresh since it can't use the bulk loading and causes too many requests partitionCache = newCacheBuilder(expiresAfterWriteMillis, OptionalLong.empty(), maximumSize, statsRecording) - .build(asyncReloading(new CacheLoader<>() + .build(new CacheLoader<>() { @Override public Optional load(WithIdentity partitionName) @@ -223,7 +223,7 @@ public Map, Optional> loadAll(Iterabl { return loadPartitionsByNames(partitionNames); } - }, executor)); + }); tablePrivilegesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) .build(asyncReloading(CacheLoader.from(key -> loadTablePrivileges(key.getDatabase(), key.getTable(), key.getOwner(), key.getPrincipal())), executor)); From eeb48237a77b8914d77800923591ebbdc16bb076 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 13:24:29 +0100 Subject: [PATCH 2/9] Refactor CachingHiveMetastore cache building Capture more commonalities that before. - `asyncReloading` is used for all caches with `refreshMillis` set - `memoizeMetastore` no longer provide an (unused) Executor - individual cache instantiation is now single line, so all the code reads better - separate bulk-loading cache building from non-bulks loading, since we do not want to do refreshes for bulk-loading caches - remove redundant `cacheBuilder = cacheBuilder.xxx` assignments --- .../metastore/cache/CachingHiveMetastore.java | 150 ++++++++++-------- .../hive/AbstractTestHiveFileSystem.java | 6 +- 2 files changed, 82 insertions(+), 74 deletions(-) diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index 08f0bb341226..ab77fd75b45b 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -70,6 +70,7 @@ import java.util.function.Function; import java.util.function.Predicate; +import static com.google.common.base.Functions.identity; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -82,7 +83,6 @@ import static com.google.common.collect.ImmutableSetMultimap.toImmutableSetMultimap; import static com.google.common.collect.Maps.immutableEntry; import static com.google.common.collect.Streams.stream; -import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.trino.plugin.hive.HivePartitionManager.extractPartitionValues; import static io.trino.plugin.hive.metastore.HivePartitionName.hivePartitionName; import static io.trino.plugin.hive.metastore.HiveTableName.hiveTableName; @@ -141,12 +141,12 @@ public static CachingHiveMetastore cachingHiveMetastore(HiveMetastore delegate, format("Invalid cache parameters (cacheTtl: %s, maxSize: %s)", cacheTtl, maximumSize)); return new CachingHiveMetastore( delegate, - executor, OptionalLong.of(cacheTtl.toMillis()), refreshInterval .map(Duration::toMillis) .map(OptionalLong::of) .orElseGet(OptionalLong::empty), + Optional.of(executor), maximumSize, StatsRecording.ENABLED); } @@ -155,90 +155,50 @@ public static CachingHiveMetastore memoizeMetastore(HiveMetastore delegate, long { return new CachingHiveMetastore( delegate, - newDirectExecutorService(), OptionalLong.empty(), OptionalLong.empty(), + Optional.empty(), maximumSize, StatsRecording.DISABLED); } - protected CachingHiveMetastore(HiveMetastore delegate, Executor executor, OptionalLong expiresAfterWriteMillis, OptionalLong refreshMills, long maximumSize, StatsRecording statsRecording) + protected CachingHiveMetastore(HiveMetastore delegate, OptionalLong expiresAfterWriteMillis, OptionalLong refreshMills, Optional executor, long maximumSize, StatsRecording statsRecording) { this.delegate = requireNonNull(delegate, "delegate is null"); requireNonNull(executor, "executor is null"); - databaseNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadAllDatabases), executor)); + databaseNamesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, ignored -> loadAllDatabases()); - databaseCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadDatabase), executor)); + databaseCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadDatabase); - tableNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadAllTables), executor)); + tableNamesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadAllTables); - tablesWithParameterCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadTablesMatchingParameter), executor)); + tablesWithParameterCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadTablesMatchingParameter); - tableStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadTableColumnStatistics), executor)); + tableStatisticsCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadTableColumnStatistics); // disable refresh since it can't use the bulk loading and causes too many requests - partitionStatisticsCache = newCacheBuilder(expiresAfterWriteMillis, OptionalLong.empty(), maximumSize, statsRecording) - .build(new CacheLoader<>() - { - @Override - public PartitionStatistics load(WithIdentity key) - { - return loadPartitionColumnStatistics(key); - } - - @Override - public Map, PartitionStatistics> loadAll(Iterable> keys) - { - return loadPartitionColumnStatistics(keys); - } - }); - - tableCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadTable), executor)); - - viewNamesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadAllViews), executor)); - - partitionFilterCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadPartitionNamesByFilter), executor)); + partitionStatisticsCache = buildCache(expiresAfterWriteMillis, maximumSize, statsRecording, this::loadPartitionColumnStatistics, this::loadPartitionsColumnStatistics); - // disable refresh since it can't use the bulk loading and causes too many requests - partitionCache = newCacheBuilder(expiresAfterWriteMillis, OptionalLong.empty(), maximumSize, statsRecording) - .build(new CacheLoader<>() - { - @Override - public Optional load(WithIdentity partitionName) - { - return loadPartitionByName(partitionName); - } + tableCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadTable); + + viewNamesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadAllViews); - @Override - public Map, Optional> loadAll(Iterable> partitionNames) - { - return loadPartitionsByNames(partitionNames); - } - }); + partitionFilterCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadPartitionNamesByFilter); - tablePrivilegesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(key -> loadTablePrivileges(key.getDatabase(), key.getTable(), key.getOwner(), key.getPrincipal())), executor)); + // disable refresh since it can't use the bulk loading and causes too many requests + partitionCache = buildCache(expiresAfterWriteMillis, maximumSize, statsRecording, this::loadPartitionByName, this::loadPartitionsByNames); + + tablePrivilegesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, key -> + loadTablePrivileges(key.getDatabase(), key.getTable(), key.getOwner(), key.getPrincipal())); - rolesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadRoles), executor)); + rolesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, ignored -> loadRoles()); - roleGrantsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadRoleGrants), executor)); + roleGrantsCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadRoleGrants); - grantedPrincipalsCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadPrincipals), executor)); + grantedPrincipalsCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadPrincipals); - configValuesCache = newCacheBuilder(expiresAfterWriteMillis, refreshMills, maximumSize, statsRecording) - .build(asyncReloading(CacheLoader.from(this::loadConfigValue), executor)); + configValuesCache = buildCache(expiresAfterWriteMillis, refreshMills, executor, maximumSize, statsRecording, this::loadConfigValue); } @Managed @@ -375,7 +335,7 @@ private PartitionStatistics loadPartitionColumnStatistics(WithIdentity, PartitionStatistics> loadPartitionColumnStatistics(Iterable> keys) + private Map, PartitionStatistics> loadPartitionsColumnStatistics(Iterable> keys) { SetMultimap, WithIdentity> tablePartitions = stream(keys) .collect(toImmutableSetMultimap(value -> new WithIdentity<>(value.getIdentity(), value.getKey().getHiveTableName()), Function.identity())); @@ -1056,20 +1016,70 @@ public void alterTransactionalTable(HiveIdentity identity, Table table, long tra } } - private static CacheBuilder newCacheBuilder(OptionalLong expiresAfterWriteMillis, OptionalLong refreshMillis, long maximumSize, StatsRecording statsRecording) + private static LoadingCache buildCache( + OptionalLong expiresAfterWriteMillis, + OptionalLong refreshMillis, + Optional refreshExecutor, + long maximumSize, + StatsRecording statsRecording, + com.google.common.base.Function loader) { + CacheLoader cacheLoader = CacheLoader.from(loader); + CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); if (expiresAfterWriteMillis.isPresent()) { - cacheBuilder = cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); + cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); } + checkArgument(refreshMillis.isEmpty() || refreshExecutor.isPresent(), "refreshMillis is provided but refreshExecutor is not"); if (refreshMillis.isPresent() && (expiresAfterWriteMillis.isEmpty() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) { - cacheBuilder = cacheBuilder.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS); + cacheBuilder.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS); + cacheLoader = asyncReloading(cacheLoader, refreshExecutor.orElseThrow(() -> new IllegalArgumentException("Executor not provided"))); } - cacheBuilder = cacheBuilder.maximumSize(maximumSize); + cacheBuilder.maximumSize(maximumSize); if (statsRecording == StatsRecording.ENABLED) { - cacheBuilder = cacheBuilder.recordStats(); + cacheBuilder.recordStats(); } - return cacheBuilder; + + return cacheBuilder.build(cacheLoader); + } + + private static LoadingCache buildCache( + OptionalLong expiresAfterWriteMillis, + long maximumSize, + StatsRecording statsRecording, + Function loader, + Function, Map> bulkLoader) + { + requireNonNull(loader, "loader is null"); + requireNonNull(bulkLoader, "bulkLoader is null"); + CacheLoader cacheLoader = new CacheLoader<>() + { + @Override + public V load(K key) + { + return loader.apply(key); + } + + @Override + public Map loadAll(Iterable keys) + { + return bulkLoader.apply(Iterables.transform(keys, identity())); + } + }; + + CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); + if (expiresAfterWriteMillis.isPresent()) { + cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); + } + + // cannot use refreshAfterWrite since it can't use the bulk loading and causes too many requests + + cacheBuilder.maximumSize(maximumSize); + if (statsRecording == StatsRecording.ENABLED) { + cacheBuilder.recordStats(); + } + + return cacheBuilder.build(cacheLoader); } private static class WithIdentity diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 41f2bf5edada..1ed0520dd6f6 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -79,7 +79,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalLong; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.IntStream; @@ -196,7 +195,6 @@ protected void setup(String host, int port, String databaseName, boolean s3Selec new ThriftMetastoreConfig(), hdfsEnvironment, false)), - executor, getBasePath(), hdfsEnvironment); locationService = new HiveLocationService(hdfsEnvironment); @@ -543,9 +541,9 @@ protected static class TestingHiveMetastore private final Path basePath; private final HdfsEnvironment hdfsEnvironment; - public TestingHiveMetastore(HiveMetastore delegate, Executor executor, Path basePath, HdfsEnvironment hdfsEnvironment) + public TestingHiveMetastore(HiveMetastore delegate, Path basePath, HdfsEnvironment hdfsEnvironment) { - super(delegate, executor, OptionalLong.empty(), OptionalLong.empty(), 0, StatsRecording.ENABLED); + super(delegate, OptionalLong.empty(), OptionalLong.empty(), Optional.empty(), 0, StatsRecording.ENABLED); this.basePath = basePath; this.hdfsEnvironment = hdfsEnvironment; } From edf0ee1b44945d833b041e51397099de951f84cb Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 15:55:32 +0100 Subject: [PATCH 3/9] Extract testing enum for reuse --- .../trino/plugin/base/cache/Invalidation.java | 35 +++++++++++++++++++ .../plugin/base/cache/TestEvictableCache.java | 22 ++---------- 2 files changed, 37 insertions(+), 20 deletions(-) create mode 100644 lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/Invalidation.java diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/Invalidation.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/Invalidation.java new file mode 100644 index 000000000000..45cdb3e47978 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/Invalidation.java @@ -0,0 +1,35 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.cache; + +import org.testng.annotations.DataProvider; + +import java.util.stream.Stream; + +enum Invalidation +{ + INVALIDATE_KEY, + INVALIDATE_PREDEFINED_KEYS, + INVALIDATE_SELECTED_KEYS, + INVALIDATE_ALL, + /**/; + + @DataProvider + public static Object[][] invalidations() + { + return Stream.of(values()) + .map(invalidation -> new Object[] {invalidation}) + .toArray(Object[][]::new); + } +} diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java index 596813a33aab..0cce0419f849 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java @@ -17,7 +17,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; import io.airlift.concurrent.MoreFutures; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.List; @@ -28,7 +27,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; -import java.util.stream.Stream; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; @@ -43,14 +41,6 @@ public class TestEvictableCache { private static final int TEST_TIMEOUT_MILLIS = 10_000; - private enum Invalidation - { - INVALIDATE_KEY, - INVALIDATE_PREDEFINED_KEYS, - INVALIDATE_SELECTED_KEYS, - INVALIDATE_ALL, - } - @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testLoad() throws Exception @@ -67,7 +57,7 @@ public void testLoad() /** * Covers https://github.com/google/guava/issues/1881 */ - @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "invalidations") + @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProviderClass = Invalidation.class, dataProvider = "invalidations") public void testInvalidateOngoingLoad(Invalidation invalidation) throws Exception { @@ -126,7 +116,7 @@ public void testInvalidateOngoingLoad(Invalidation invalidation) /** * Covers https://github.com/google/guava/issues/1881 */ - @Test(invocationCount = 10, timeOut = TEST_TIMEOUT_MILLIS, dataProvider = "invalidations") + @Test(invocationCount = 10, timeOut = TEST_TIMEOUT_MILLIS, dataProviderClass = Invalidation.class, dataProvider = "invalidations") public void testInvalidateAndLoadConcurrently(Invalidation invalidation) throws Exception { @@ -190,12 +180,4 @@ public void testInvalidateAndLoadConcurrently(Invalidation invalidation) executor.awaitTermination(10, SECONDS); } } - - @DataProvider - public static Object[][] invalidations() - { - return Stream.of(Invalidation.values()) - .map(invalidation -> new Object[] {invalidation}) - .toArray(Object[][]::new); - } } From 6b7d072d6d9092c1ff2845957fea9e55532cdfb3 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 22:42:07 +0100 Subject: [PATCH 4/9] Rename some trino-plugin-toolkit test resources They have same names as test resources in trino-main. This allows toolkit's and main's test jars to be used together on a plugin's classpath. The chosen prefix, `file-based-system-`, is already used for a few other test resources used in the same test class. --- .../TestFileBasedSystemAccessControl.java | 18 +++++++++--------- ...log.json => file-based-system-catalog.json} | 0 ...> file-based-system-catalog_principal.json} | 0 ...curity-config-file-with-unknown-rules.json} | 0 4 files changed, 9 insertions(+), 9 deletions(-) rename lib/trino-plugin-toolkit/src/test/resources/{catalog.json => file-based-system-catalog.json} (100%) rename lib/trino-plugin-toolkit/src/test/resources/{catalog_principal.json => file-based-system-catalog_principal.json} (100%) rename lib/trino-plugin-toolkit/src/test/resources/{security-config-file-with-unknown-rules.json => file-based-system-security-config-file-with-unknown-rules.json} (100%) diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/security/TestFileBasedSystemAccessControl.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/security/TestFileBasedSystemAccessControl.java index afacba34e08c..b972da6b7c26 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/security/TestFileBasedSystemAccessControl.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/security/TestFileBasedSystemAccessControl.java @@ -745,7 +745,7 @@ public void testTableRulesForCheckCanSetTableProperties() @Test public void testCanSetUserOperations() { - SystemAccessControl accessControl = newFileBasedSystemAccessControl("catalog_principal.json"); + SystemAccessControl accessControl = newFileBasedSystemAccessControl("file-based-system-catalog_principal.json"); try { accessControl.checkCanSetUser(Optional.empty(), alice.getUser()); @@ -780,7 +780,7 @@ public void testCanSetUserOperations() catch (AccessDeniedException expected) { } - SystemAccessControl accessControlNoPatterns = newFileBasedSystemAccessControl("catalog.json"); + SystemAccessControl accessControlNoPatterns = newFileBasedSystemAccessControl("file-based-system-catalog.json"); accessControlNoPatterns.checkCanSetUser(kerberosValidAlice.getPrincipal(), kerberosValidAlice.getUser()); } @@ -851,7 +851,7 @@ public void testInvalidQuery() @Test public void testQueryNotSet() { - SystemAccessControl accessControlManager = newFileBasedSystemAccessControl("catalog.json"); + SystemAccessControl accessControlManager = newFileBasedSystemAccessControl("file-based-system-catalog.json"); accessControlManager.checkCanExecuteQuery(new SystemSecurityContext(bob, queryId)); accessControlManager.checkCanViewQueryOwnedBy(new SystemSecurityContext(bob, queryId), any); @@ -939,7 +939,7 @@ public void testSystemInformation() @Test public void testSystemInformationNotSet() { - SystemAccessControl accessControlManager = newFileBasedSystemAccessControl("catalog.json"); + SystemAccessControl accessControlManager = newFileBasedSystemAccessControl("file-based-system-catalog.json"); assertThatThrownBy(() -> accessControlManager.checkCanReadSystemInformation(new SystemSecurityContext(bob, Optional.empty()))) .isInstanceOf(AccessDeniedException.class) @@ -974,7 +974,7 @@ public void testSystemInformationDocsExample() @Test public void testSchemaOperations() { - SystemAccessControl accessControl = newFileBasedSystemAccessControl("catalog.json"); + SystemAccessControl accessControl = newFileBasedSystemAccessControl("file-based-system-catalog.json"); TrinoPrincipal user = new TrinoPrincipal(PrincipalType.USER, "some_user"); TrinoPrincipal role = new TrinoPrincipal(PrincipalType.ROLE, "some_user"); @@ -1308,7 +1308,7 @@ public void testRefreshing() { File configFile = newTemporaryFile(); configFile.deleteOnExit(); - copy(new File(getResourcePath("catalog.json")), configFile); + copy(new File(getResourcePath("file-based-system-catalog.json")), configFile); SystemAccessControl accessControl = newFileBasedSystemAccessControl(ImmutableMap.of( SECURITY_CONFIG_FILE, configFile.getAbsolutePath(), @@ -1319,7 +1319,7 @@ public void testRefreshing() accessControl.checkCanCreateView(alice, aliceView); accessControl.checkCanCreateView(alice, aliceView); - copy(new File(getResourcePath("security-config-file-with-unknown-rules.json")), configFile); + copy(new File(getResourcePath("file-based-system-security-config-file-with-unknown-rules.json")), configFile); sleep(2); assertThatThrownBy(() -> accessControl.checkCanCreateView(alice, aliceView)) @@ -1331,7 +1331,7 @@ public void testRefreshing() .isInstanceOf(IllegalArgumentException.class) .hasMessageStartingWith("Invalid JSON file"); - copy(new File(getResourcePath("catalog.json")), configFile); + copy(new File(getResourcePath("file-based-system-catalog.json")), configFile); sleep(2); accessControl.checkCanCreateView(alice, aliceView); @@ -1340,7 +1340,7 @@ public void testRefreshing() @Test public void parseUnknownRules() { - assertThatThrownBy(() -> newFileBasedSystemAccessControl("security-config-file-with-unknown-rules.json")) + assertThatThrownBy(() -> newFileBasedSystemAccessControl("file-based-system-security-config-file-with-unknown-rules.json")) .hasMessageContaining("Invalid JSON"); } diff --git a/lib/trino-plugin-toolkit/src/test/resources/catalog.json b/lib/trino-plugin-toolkit/src/test/resources/file-based-system-catalog.json similarity index 100% rename from lib/trino-plugin-toolkit/src/test/resources/catalog.json rename to lib/trino-plugin-toolkit/src/test/resources/file-based-system-catalog.json diff --git a/lib/trino-plugin-toolkit/src/test/resources/catalog_principal.json b/lib/trino-plugin-toolkit/src/test/resources/file-based-system-catalog_principal.json similarity index 100% rename from lib/trino-plugin-toolkit/src/test/resources/catalog_principal.json rename to lib/trino-plugin-toolkit/src/test/resources/file-based-system-catalog_principal.json diff --git a/lib/trino-plugin-toolkit/src/test/resources/security-config-file-with-unknown-rules.json b/lib/trino-plugin-toolkit/src/test/resources/file-based-system-security-config-file-with-unknown-rules.json similarity index 100% rename from lib/trino-plugin-toolkit/src/test/resources/security-config-file-with-unknown-rules.json rename to lib/trino-plugin-toolkit/src/test/resources/file-based-system-security-config-file-with-unknown-rules.json From 2c5dcb608f0bef0570cc366f717af1b511f8b95b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 16:03:09 +0100 Subject: [PATCH 5/9] Extract CacheStatsAssertions for reuse Extract `CacheStatsAssertions` from `TestCachingJdbcClient`. --- .../base/cache/CacheStatsAssertions.java | 79 +++++++++++++++++++ plugin/trino-base-jdbc/pom.xml | 7 ++ .../plugin/jdbc/TestCachingJdbcClient.java | 64 ++------------- pom.xml | 7 ++ 4 files changed, 98 insertions(+), 59 deletions(-) create mode 100644 lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java new file mode 100644 index 000000000000..6116de6e5234 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java @@ -0,0 +1,79 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.cache; + +import com.google.common.cache.CacheStats; + +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; +import static org.assertj.core.api.Assertions.assertThat; + +public class CacheStatsAssertions +{ + public static CacheStatsAssertions assertCacheStats(Supplier statsSupplier) + { + return new CacheStatsAssertions(statsSupplier); + } + + private final Supplier stats; + + private long loads; + private long hits; + private long misses; + + private CacheStatsAssertions(Supplier stats) + { + this.stats = requireNonNull(stats, "stats is null"); + } + + public CacheStatsAssertions loads(long value) + { + this.loads = value; + return this; + } + + public CacheStatsAssertions hits(long value) + { + this.hits = value; + return this; + } + + public CacheStatsAssertions misses(long value) + { + this.misses = value; + return this; + } + + public void afterRunning(Runnable runnable) + { + CacheStats beforeStats = stats.get(); + runnable.run(); + CacheStats afterStats = stats.get(); + + long expectedLoad = beforeStats.loadCount() + loads; + long expectedMisses = beforeStats.missCount() + misses; + long expectedHits = beforeStats.hitCount() + hits; + + assertThat(afterStats.loadCount()) + .withFailMessage("Expected load count is %d but actual is %d", expectedLoad, afterStats.loadCount()) + .isEqualTo(expectedLoad); + assertThat(afterStats.hitCount()) + .withFailMessage("Expected hit count is %d but actual is %d", expectedHits, afterStats.hitCount()) + .isEqualTo(expectedHits); + assertThat(afterStats.missCount()) + .withFailMessage("Expected miss count is %d but actual is %d", expectedMisses, afterStats.missCount()) + .isEqualTo(expectedMisses); + } +} diff --git a/plugin/trino-base-jdbc/pom.xml b/plugin/trino-base-jdbc/pom.xml index 87b6608660a4..78f3dab893ed 100644 --- a/plugin/trino-base-jdbc/pom.xml +++ b/plugin/trino-base-jdbc/pom.xml @@ -165,6 +165,13 @@ test + + io.trino + trino-plugin-toolkit + test-jar + test + + io.trino trino-spi diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java index 4e49c8cd5d4b..b3e219805946 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java @@ -13,11 +13,11 @@ */ package io.trino.plugin.jdbc; -import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import io.airlift.units.Duration; +import io.trino.plugin.base.cache.CacheStatsAssertions; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.jdbc.credential.ExtraCredentialConfig; import io.trino.spi.connector.ColumnHandle; @@ -44,9 +44,9 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.function.Supplier; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.trino.plugin.base.cache.CacheStatsAssertions.assertCacheStats; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.testing.InterfaceTestUtils.assertAllMethodsOverridden; import static io.trino.spi.type.IntegerType.INTEGER; @@ -55,7 +55,6 @@ import static java.lang.Math.abs; import static java.lang.Math.min; import static java.util.Collections.emptyList; -import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -673,70 +672,17 @@ public void testEverythingImplemented() private static CacheStatsAssertions assertTableNamesCache(CachingJdbcClient cachingJdbcClient) { - return new CacheStatsAssertions(cachingJdbcClient::getTableNamesCacheStats); + return assertCacheStats(cachingJdbcClient::getTableNamesCacheStats); } private static CacheStatsAssertions assertColumnCacheStats(CachingJdbcClient client) { - return new CacheStatsAssertions(client::getColumnsCacheStats); + return assertCacheStats(client::getColumnsCacheStats); } private static CacheStatsAssertions assertStatisticsCacheStats(CachingJdbcClient client) { - return new CacheStatsAssertions(client::getStatisticsCacheStats); - } - - private static final class CacheStatsAssertions - { - private final Supplier stats; - - private long loads; - private long hits; - private long misses; - - private CacheStatsAssertions(Supplier stats) - { - this.stats = requireNonNull(stats, "stats is null"); - } - - public CacheStatsAssertions loads(long value) - { - this.loads = value; - return this; - } - - public CacheStatsAssertions hits(long value) - { - this.hits = value; - return this; - } - - public CacheStatsAssertions misses(long value) - { - this.misses = value; - return this; - } - - public void afterRunning(Runnable runnable) - { - CacheStats beforeStats = stats.get(); - runnable.run(); - CacheStats afterStats = stats.get(); - - long expectedLoad = beforeStats.loadCount() + loads; - long expectedMisses = beforeStats.missCount() + misses; - long expectedHits = beforeStats.hitCount() + hits; - - assertThat(afterStats.loadCount()) - .withFailMessage("Expected load count is %d but actual is %d", expectedLoad, afterStats.loadCount()) - .isEqualTo(expectedLoad); - assertThat(afterStats.hitCount()) - .withFailMessage("Expected hit count is %d but actual is %d", expectedHits, afterStats.hitCount()) - .isEqualTo(expectedHits); - assertThat(afterStats.missCount()) - .withFailMessage("Expected miss count is %d but actual is %d", expectedMisses, afterStats.missCount()) - .isEqualTo(expectedMisses); - } + return assertCacheStats(client::getStatisticsCacheStats); } private static String randomSuffix() diff --git a/pom.xml b/pom.xml index 436873a83e3c..4f9400a79d32 100644 --- a/pom.xml +++ b/pom.xml @@ -388,6 +388,13 @@ ${project.version} + + io.trino + trino-plugin-toolkit + test-jar + ${project.version} + + io.trino trino-product-tests From 512536214d6aff668a49ba16fb4055605418fdc3 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 16:17:43 +0100 Subject: [PATCH 6/9] Test EvictableCache hits --- lib/trino-plugin-toolkit/pom.xml | 7 ++++ .../base/cache/CacheStatsAssertions.java | 26 +++++++++++- .../plugin/base/cache/TestEvictableCache.java | 40 ++++++++++++++++++- 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/lib/trino-plugin-toolkit/pom.xml b/lib/trino-plugin-toolkit/pom.xml index e848f83e5db5..ebb2211fb649 100644 --- a/lib/trino-plugin-toolkit/pom.xml +++ b/lib/trino-plugin-toolkit/pom.xml @@ -146,6 +146,13 @@ test + + org.gaul + modernizer-maven-annotations + 2.1.0 + test + + org.testng testng diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java index 6116de6e5234..aaadfd619466 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/CacheStatsAssertions.java @@ -13,8 +13,10 @@ */ package io.trino.plugin.base.cache; +import com.google.common.cache.Cache; import com.google.common.cache.CacheStats; +import java.util.concurrent.Callable; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -22,6 +24,12 @@ public class CacheStatsAssertions { + public static CacheStatsAssertions assertCacheStats(Cache cache) + { + requireNonNull(cache, "cache is null"); + return assertCacheStats(cache::stats); + } + public static CacheStatsAssertions assertCacheStats(Supplier statsSupplier) { return new CacheStatsAssertions(statsSupplier); @@ -57,9 +65,23 @@ public CacheStatsAssertions misses(long value) } public void afterRunning(Runnable runnable) + { + try { + calling(() -> { + runnable.run(); + return null; + }); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public T calling(Callable callable) + throws Exception { CacheStats beforeStats = stats.get(); - runnable.run(); + T value = callable.call(); CacheStats afterStats = stats.get(); long expectedLoad = beforeStats.loadCount() + loads; @@ -75,5 +97,7 @@ public void afterRunning(Runnable runnable) assertThat(afterStats.missCount()) .withFailMessage("Expected miss count is %d but actual is %d", expectedMisses, afterStats.missCount()) .isEqualTo(expectedMisses); + + return value; } } diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java index 0cce0419f849..4a816a868e97 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java @@ -17,6 +17,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; import io.airlift.concurrent.MoreFutures; +import org.gaul.modernizer_maven_annotations.SuppressModernizer; import org.testng.annotations.Test; import java.util.List; @@ -30,10 +31,12 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.trino.plugin.base.cache.CacheStatsAssertions.assertCacheStats; import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -44,14 +47,47 @@ public class TestEvictableCache @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testLoad() throws Exception + { + Cache cache = EvictableCache.buildWith( + CacheBuilder.newBuilder()); + assertEquals(cache.get(42, () -> "abc"), "abc"); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testLoadStats() + throws Exception { Cache cache = EvictableCache.buildWith( CacheBuilder.newBuilder()); assertEquals(cache.stats(), new CacheStats(0, 0, 0, 0, 0, 0)); - String value = cache.get(42, () -> "abc"); - assertEquals(cache.stats(), new CacheStats(0, 1, 1, 0, cache.stats().totalLoadTime(), 0)); + + String value = assertCacheStats(cache) + .misses(1) + .loads(1) + .calling(() -> cache.get(42, () -> "abc")); assertEquals(value, "abc"); + + value = assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(42, () -> "xyz")); + assertEquals(value, "abc"); + + // with equal, but not the same key + value = assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(newInteger(42), () -> "xyz")); + assertEquals(value, "abc"); + } + + @SuppressModernizer + private static Integer newInteger(int value) + { + Integer integer = value; + @SuppressWarnings({"UnnecessaryBoxing", "deprecation", "BoxedPrimitiveConstructor"}) + Integer newInteger = new Integer(value); + assertNotSame(integer, newInteger); + return newInteger; } /** From e454a5013a33e462d522dbc1eaf70f309de6d71b Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 18 Jan 2022 10:26:05 +0100 Subject: [PATCH 7/9] Fix and improve EvictableCache javadoc Fix grammar. Improve wording. --- .../java/io/trino/plugin/base/cache/EvictableCache.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java index 97cedec90f9e..43e03d6e05b2 100644 --- a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableCache.java @@ -36,10 +36,10 @@ import static java.util.Objects.requireNonNull; /** - * A Cache implementation similar to ones produced by {@code com.google.common.cache.CacheBuilder}, - * but one that does not exhibits Guava issue #1881, i.e. - * a {@link #getIfPresent(Object)} after {@link #invalidate(Object)} is guaranteed to return {@code null} and - * {@link #get(Object, Callable)} after {@link #invalidate(Object)} is guaranteed to load a fresh value. + * A {@link Cache} implementation similar to ones produced by {@link CacheBuilder#build()}, but one that does not exhibit + * Guava issue #1881: a cache inspection with + * {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} is guaranteed to return fresh state after + * {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. */ public class EvictableCache extends AbstractCache From fcce46e2f54497ac389f7b8e83773f1276ad9989 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Tue, 18 Jan 2022 12:40:22 +0100 Subject: [PATCH 8/9] Improve test interleaving determinism --- .../io/trino/plugin/base/cache/TestEvictableCache.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java index 4a816a868e97..2b8cb9517c5e 100644 --- a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableCache.java @@ -102,15 +102,18 @@ public void testInvalidateOngoingLoad(Invalidation invalidation) CountDownLatch loadOngoing = new CountDownLatch(1); CountDownLatch invalidated = new CountDownLatch(1); + CountDownLatch getReturned = new CountDownLatch(1); ExecutorService executor = newFixedThreadPool(2); try { // thread A Future threadA = executor.submit(() -> { - return cache.get(key, () -> { + String value = cache.get(key, () -> { loadOngoing.countDown(); // 1 assertTrue(invalidated.await(10, SECONDS)); // 2 return "stale value"; }); + getReturned.countDown(); // 3 + return value; }); // thread B @@ -136,6 +139,8 @@ public void testInvalidateOngoingLoad(Invalidation invalidation) } invalidated.countDown(); // 2 + // Cache may persist value after loader returned, but before `cache.get(...)` returned. Ensure the latter completed. + assertTrue(getReturned.await(10, SECONDS)); // 3 return cache.get(key, () -> "fresh value"); }); From 0e42ed3bc6739e80d55777710a7d8da77aa35a61 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 17 Jan 2022 19:06:51 +0100 Subject: [PATCH 9/9] Fix invalidation race in CachingHiveMetastore The `LoadingCache` used in `CachingHiveMetastore` was susceptible to a race between load-in-flight and invalidation. The invalidation of a key would not prevent load in flight from being inserted into the cache. --- .../base/cache/EvictableLoadingCache.java | 375 ++++++++++++++++++ .../base/cache/TestEvictableLoadingCache.java | 298 ++++++++++++++ .../metastore/cache/CachingHiveMetastore.java | 40 +- .../metastore/UnimplementedHiveMetastore.java | 2 +- .../cache/TestCachingHiveMetastore.java | 198 +++++++++ 5 files changed, 888 insertions(+), 25 deletions(-) create mode 100644 lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java create mode 100644 lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableLoadingCache.java diff --git a/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java new file mode 100644 index 000000000000..a9b532638b14 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/cache/EvictableLoadingCache.java @@ -0,0 +1,375 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.cache; + +import com.google.common.cache.AbstractLoadingCache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkState; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * A {@link LoadingCache} implementation similar to ones produced by {@link CacheBuilder#build(CacheLoader)}, + * but one that does not exhibit Guava issue #1881: + * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object)} is guaranteed to return fresh + * state after {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. + */ +public class EvictableLoadingCache + extends AbstractLoadingCache +{ + public static LoadingCache build( + OptionalLong expiresAfterWriteMillis, + OptionalLong refreshMillis, + long maximumSize, + boolean recordStats, + CacheLoader cacheLoader) + { + requireNonNull(cacheLoader, "cacheLoader is null"); + + CacheBuilder tokenCache = CacheBuilder.newBuilder(); + CacheBuilder dataCache = CacheBuilder.newBuilder(); + if (expiresAfterWriteMillis.isPresent()) { + tokenCache.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); + dataCache.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); + } + + if (refreshMillis.isPresent() && (expiresAfterWriteMillis.isEmpty() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) { + dataCache.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS); + } + + tokenCache.maximumSize(maximumSize); + dataCache.maximumSize(maximumSize); + + if (recordStats) { + dataCache.recordStats(); + } + + return new EvictableLoadingCache<>( + EvictableCache.buildWith(tokenCache), + dataCache.build(new TokenCacheLoader<>(cacheLoader))); + } + + // Token is a freshness marker. tokenCache keeps the fresh token for given key. + // Cache invalidation correctness (read-after-invalidate consistency) relies on tokenCache + // invalidation. + private final EvictableCache> tokensCache; + + // dataCache keeps data for tokens, potentially including stale entries. + // dataCache invalidation is opportunistic. It's necessary for releasing memory, but + // is *not* necessary for correctness (read-after-invalidate consistency). + private final LoadingCache, V> dataCache; + + private EvictableLoadingCache(EvictableCache> tokensCache, LoadingCache, V> dataCache) + { + this.tokensCache = requireNonNull(tokensCache, "tokensCache is null"); + this.dataCache = requireNonNull(dataCache, "dataCache is null"); + } + + @Override + public V get(K key) + throws ExecutionException + { + Token token = tokensCache.get(key, () -> new Token<>(key)); + return dataCache.get(token); + } + + @Override + public ImmutableMap getAll(Iterable keys) + throws ExecutionException + { + BiMap> keyToToken = HashBiMap.create(); + for (K key : keys) { + // This is not bulk, but is fast local operation + keyToToken.put(key, tokensCache.get(key, () -> new Token<>(key))); + } + + Map, V> values = dataCache.getAll(keyToToken.values()); + + BiMap, K> tokenToKey = keyToToken.inverse(); + ImmutableMap.Builder result = ImmutableMap.builder(); + for (Map.Entry, V> entry : values.entrySet()) { + Token token = entry.getKey(); + + // While token.getKey() returns equal key, a caller may expect us to maintain key identity, in case equal keys are still distinguishable. + K key = tokenToKey.get(token); + checkState(key != null, "No key found for %s in %s when loading %s", token, tokenToKey, keys); + + result.put(key, entry.getValue()); + } + return result.buildOrThrow(); + } + + @Override + public V getIfPresent(Object key) + { + Token token = tokensCache.getIfPresent(key); + if (token == null) { + return null; + } + return dataCache.getIfPresent(token); + } + + @Override + public void invalidate(Object key) + { + Token token = tokensCache.getIfPresent(key); + // Invalidate irrespective of whether we got a token, to invalidate result of ongoing token load (instantiation). + tokensCache.invalidate(key); + if (token != null) { + dataCache.invalidate(token); + } + } + + @Override + public void invalidateAll(Iterable keys) + { + ImmutableMap> tokens = tokensCache.getAllPresent(keys); + tokensCache.invalidateAll(keys); + // This is racy with respect to concurrent cache loading. dataCache is not guaranteed not to have entries for keys after invalidateAll(keys) returns. + // Note that dataCache invalidation is not necessary for correctness. Still one can expect that cache invalidation releases memory. + dataCache.invalidateAll(tokens.values()); + } + + @Override + public void invalidateAll() + { + tokensCache.invalidateAll(); + dataCache.invalidateAll(); + } + + @Override + public long size() + { + return dataCache.size(); + } + + @Override + public CacheStats stats() + { + return dataCache.stats(); + } + + @Override + public ConcurrentMap asMap() + { + return new ConcurrentMap() + { + private final ConcurrentMap> tokenCacheMap = tokensCache.asMap(); + private final ConcurrentMap, V> dataCacheMap = dataCache.asMap(); + + @Override + public V putIfAbsent(K key, V value) + { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); + } + + @Override + public boolean remove(Object key, Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) + { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); + } + + @Override + public V replace(K key, V value) + { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); + } + + @Override + public int size() + { + return dataCache.asMap().size(); + } + + @Override + public boolean isEmpty() + { + return dataCache.asMap().isEmpty(); + } + + @Override + public boolean containsKey(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsValue(Object value) + { + throw new UnsupportedOperationException(); + } + + @Override + public V get(Object key) + { + return getIfPresent(key); + } + + @Override + public V put(K key, V value) + { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + public V remove(Object key) + { + throw new UnsupportedOperationException(); + } + + @Override + public void putAll(Map m) + { + throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead."); + } + + @Override + public void clear() + { + dataCacheMap.clear(); + tokenCacheMap.clear(); + } + + @Override + public Set keySet() + { + return tokenCacheMap.keySet(); + } + + @Override + public Collection values() + { + return dataCacheMap.values(); + } + + @Override + public Set> entrySet() + { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public void cleanUp() + { + tokensCache.cleanUp(); + dataCache.cleanUp(); + } + + // instance-based equality + private static class Token + { + private final K key; + + Token(K key) + { + this.key = requireNonNull(key, "key is null"); + } + + K getKey() + { + return key; + } + + @Override + public String toString() + { + return format("Cache Token(%s)", key); + } + } + + private static class TokenCacheLoader + extends CacheLoader, V> + { + private final CacheLoader delegate; + + public TokenCacheLoader(CacheLoader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public V load(Token token) + throws Exception + { + return delegate.load(token.getKey()); + } + + @Override + public ListenableFuture reload(Token token, V oldValue) + throws Exception + { + return delegate.reload(token.getKey(), oldValue); + } + + @Override + public Map, V> loadAll(Iterable> tokens) + throws Exception + { + List> tokenList = ImmutableList.copyOf(tokens); + List keys = new ArrayList<>(); + for (Token token : tokenList) { + keys.add(token.getKey()); + } + Map values = delegate.loadAll(keys); + + ImmutableMap.Builder, V> result = ImmutableMap.builder(); + for (int i = 0; i < tokenList.size(); i++) { + Token token = tokenList.get(i); + K key = keys.get(i); + V value = values.get(key); + // CacheLoader.loadAll is not guaranteed to return values for all the keys + if (value != null) { + result.put(token, value); + } + } + return result.buildOrThrow(); + } + + @Override + public String toString() + { + return toStringHelper(this) + .addValue(delegate) + .toString(); + } + } +} diff --git a/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableLoadingCache.java b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableLoadingCache.java new file mode 100644 index 000000000000..678f03eacad8 --- /dev/null +++ b/lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/cache/TestEvictableLoadingCache.java @@ -0,0 +1,298 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.base.cache; + +import com.google.common.cache.CacheLoader; +import com.google.common.cache.CacheStats; +import com.google.common.cache.LoadingCache; +import io.airlift.concurrent.MoreFutures; +import org.gaul.modernizer_maven_annotations.SuppressModernizer; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.plugin.base.cache.CacheStatsAssertions.assertCacheStats; +import static java.lang.String.format; +import static java.util.concurrent.Executors.newFixedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotSame; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +public class TestEvictableLoadingCache +{ + private static final int TEST_TIMEOUT_MILLIS = 10_000; + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testLoad() + throws Exception + { + LoadingCache cache = EvictableLoadingCache.build( + OptionalLong.empty(), + OptionalLong.empty(), + 10_000, + true, + CacheLoader.from((Integer ignored) -> "abc")); + + assertEquals(cache.get(42), "abc"); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testLoadStats() + throws Exception + { + LoadingCache cache = EvictableLoadingCache.build( + OptionalLong.empty(), + OptionalLong.empty(), + 10_000, + true, + CacheLoader.from((Integer ignored) -> "abc")); + + assertEquals(cache.stats(), new CacheStats(0, 0, 0, 0, 0, 0)); + + String value = assertCacheStats(cache) + .misses(1) + .loads(1) + .calling(() -> cache.get(42)); + assertEquals(value, "abc"); + + value = assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(42)); + assertEquals(value, "abc"); + + // with equal, but not the same key + value = assertCacheStats(cache) + .hits(1) + .calling(() -> cache.get(newInteger(42))); + assertEquals(value, "abc"); + } + + @SuppressModernizer + private static Integer newInteger(int value) + { + Integer integer = value; + @SuppressWarnings({"UnnecessaryBoxing", "deprecation", "BoxedPrimitiveConstructor"}) + Integer newInteger = new Integer(value); + assertNotSame(integer, newInteger); + return newInteger; + } + + /** + * Verity that implementation of {@link LoadingCache#getAll(Iterable)} returns same keys as provided, not equal ones. + * This is necessary for the case where the cache key can be equal but still distinguishable. + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testGetAllMaintainsKeyIdentity() + throws Exception + { + LoadingCache cache = EvictableLoadingCache.build( + OptionalLong.empty(), + OptionalLong.empty(), + 10_000, + true, + CacheLoader.from(String::length)); + + String first = "abc"; + String second = new String(first); + assertNotSame(first, second); + + // prime the cache + assertEquals((int) cache.get(first), 3); + + Map values = cache.getAll(List.of(second)); + assertThat(values).hasSize(1); + Entry entry = getOnlyElement(values.entrySet()); + assertEquals((int) entry.getValue(), 3); + assertEquals(entry.getKey(), first); + assertEquals(entry.getKey(), second); + assertNotSame(entry.getKey(), first); + assertSame(entry.getKey(), second); + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test(timeOut = TEST_TIMEOUT_MILLIS, dataProviderClass = Invalidation.class, dataProvider = "invalidations") + public void testInvalidateOngoingLoad(Invalidation invalidation) + throws Exception + { + ConcurrentMap remoteState = new ConcurrentHashMap<>(); + Integer key = 42; + remoteState.put(key, "stale value"); + + CountDownLatch loadOngoing = new CountDownLatch(1); + CountDownLatch invalidated = new CountDownLatch(1); + CountDownLatch getReturned = new CountDownLatch(1); + + LoadingCache cache = EvictableLoadingCache.build( + OptionalLong.empty(), + OptionalLong.empty(), + 10_000, + false, + new CacheLoader<>() + { + @Override + public String load(Integer key) + throws Exception + { + String value = remoteState.get(key); + loadOngoing.countDown(); // 1 + assertTrue(invalidated.await(10, SECONDS)); // 2 + return value; + } + }); + + ExecutorService executor = newFixedThreadPool(2); + try { + // thread A + Future threadA = executor.submit(() -> { + String value = cache.get(key); + getReturned.countDown(); // 3 + return value; + }); + + // thread B + Future threadB = executor.submit(() -> { + assertTrue(loadOngoing.await(10, SECONDS)); // 1 + + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(List.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + } + + remoteState.put(key, "fresh value"); + invalidated.countDown(); // 2 + // Cache may persist value after loader returned, but before `cache.get(...)` returned. Ensure the latter completed. + assertTrue(getReturned.await(10, SECONDS)); // 3 + + return cache.get(key); + }); + + assertEquals(threadA.get(), "stale value"); + assertEquals(threadB.get(), "fresh value"); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + + /** + * Covers https://github.com/google/guava/issues/1881 + */ + @Test(invocationCount = 10, timeOut = TEST_TIMEOUT_MILLIS, dataProviderClass = Invalidation.class, dataProvider = "invalidations") + public void testInvalidateAndLoadConcurrently(Invalidation invalidation) + throws Exception + { + int[] primes = {2, 3, 5, 7}; + + Integer key = 42; + Map remoteState = new ConcurrentHashMap<>(); + remoteState.put(key, new AtomicLong(1)); + + LoadingCache cache = EvictableLoadingCache.build( + OptionalLong.empty(), + OptionalLong.empty(), + 10_000, + false, + CacheLoader.from(i -> remoteState.get(i).get())); + + int threads = 4; + CyclicBarrier barrier = new CyclicBarrier(threads); + ExecutorService executor = newFixedThreadPool(threads); + try { + List> futures = IntStream.range(0, threads) + .mapToObj(threadNumber -> executor.submit(() -> { + // prime the cache + assertEquals((long) cache.get(key), 1L); + int prime = primes[threadNumber]; + + barrier.await(10, SECONDS); + + // modify underlying state + remoteState.get(key).updateAndGet(current -> current * prime); + + // invalidate + switch (invalidation) { + case INVALIDATE_KEY: + cache.invalidate(key); + break; + case INVALIDATE_PREDEFINED_KEYS: + cache.invalidateAll(List.of(key)); + break; + case INVALIDATE_SELECTED_KEYS: + Set keys = cache.asMap().keySet().stream() + .filter(foundKey -> (int) foundKey == key) + .collect(toImmutableSet()); + cache.invalidateAll(keys); + break; + case INVALIDATE_ALL: + cache.invalidateAll(); + break; + } + + // read through cache + long current = cache.get(key); + if (current % prime != 0) { + fail(format("The value read through cache (%s) in thread (%s) is not divisable by (%s)", current, threadNumber, prime)); + } + + return (Void) null; + })) + .collect(toImmutableList()); + + futures.forEach(MoreFutures::getFutureValue); + + assertEquals(remoteState.keySet(), Set.of(key)); + assertEquals(remoteState.get(key).get(), 2 * 3 * 5 * 7); + assertEquals((long) cache.get(key), 2 * 3 * 5 * 7); + } + finally { + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } +} diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index ab77fd75b45b..095138b9ceb5 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hive.metastore.cache; -import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; @@ -23,6 +22,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.jmx.CacheStatsMBean; import io.airlift.units.Duration; +import io.trino.plugin.base.cache.EvictableLoadingCache; import io.trino.plugin.hive.HivePartition; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.PartitionNotFoundException; @@ -91,7 +91,6 @@ import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastoreConfig.isCacheEnabled; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hive.common.FileUtils.makePartName; /** @@ -1026,21 +1025,20 @@ private static LoadingCache buildCache( { CacheLoader cacheLoader = CacheLoader.from(loader); - CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); - if (expiresAfterWriteMillis.isPresent()) { - cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); - } checkArgument(refreshMillis.isEmpty() || refreshExecutor.isPresent(), "refreshMillis is provided but refreshExecutor is not"); if (refreshMillis.isPresent() && (expiresAfterWriteMillis.isEmpty() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) { - cacheBuilder.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS); cacheLoader = asyncReloading(cacheLoader, refreshExecutor.orElseThrow(() -> new IllegalArgumentException("Executor not provided"))); } - cacheBuilder.maximumSize(maximumSize); - if (statsRecording == StatsRecording.ENABLED) { - cacheBuilder.recordStats(); + else { + refreshMillis = OptionalLong.empty(); } - return cacheBuilder.build(cacheLoader); + return EvictableLoadingCache.build( + expiresAfterWriteMillis, + refreshMillis, + maximumSize, + statsRecording == StatsRecording.ENABLED, + cacheLoader); } private static LoadingCache buildCache( @@ -1067,19 +1065,13 @@ public Map loadAll(Iterable keys) } }; - CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); - if (expiresAfterWriteMillis.isPresent()) { - cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); - } - - // cannot use refreshAfterWrite since it can't use the bulk loading and causes too many requests - - cacheBuilder.maximumSize(maximumSize); - if (statsRecording == StatsRecording.ENABLED) { - cacheBuilder.recordStats(); - } - - return cacheBuilder.build(cacheLoader); + return EvictableLoadingCache.build( + expiresAfterWriteMillis, + // cannot use refreshAfterWrite since it can't use the bulk loading and causes too many requests + OptionalLong.empty(), + maximumSize, + statsRecording == StatsRecording.ENABLED, + cacheLoader); } private static class WithIdentity diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java index f63437d14a18..d2e17253559c 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/UnimplementedHiveMetastore.java @@ -29,7 +29,7 @@ import java.util.Set; import java.util.function.Function; -class UnimplementedHiveMetastore +public class UnimplementedHiveMetastore implements HiveMetastore { @Override diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java index 11a11a3cdd65..ea764a4f86fb 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/metastore/cache/TestCachingHiveMetastore.java @@ -17,16 +17,20 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.ListeningExecutorService; +import io.airlift.log.Logger; import io.airlift.units.Duration; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HiveConfig; import io.trino.plugin.hive.HiveMetastoreClosure; import io.trino.plugin.hive.PartitionStatistics; import io.trino.plugin.hive.authentication.HiveIdentity; +import io.trino.plugin.hive.metastore.Column; +import io.trino.plugin.hive.metastore.HiveMetastore; import io.trino.plugin.hive.metastore.HivePrincipal; import io.trino.plugin.hive.metastore.MetastoreConfig; import io.trino.plugin.hive.metastore.Partition; import io.trino.plugin.hive.metastore.Table; +import io.trino.plugin.hive.metastore.UnimplementedHiveMetastore; import io.trino.plugin.hive.metastore.thrift.BridgingHiveMetastore; import io.trino.plugin.hive.metastore.thrift.MetastoreLocator; import io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient; @@ -38,26 +42,40 @@ import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.predicate.ValueSet; +import io.trino.testing.DataProviders; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkState; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.slice.Slices.utf8Slice; import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.PARTITION_KEY; import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; +import static io.trino.plugin.hive.HiveStorageFormat.TEXTFILE; import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; import static io.trino.plugin.hive.HiveType.HIVE_STRING; +import static io.trino.plugin.hive.HiveType.toHiveType; import static io.trino.plugin.hive.metastore.HiveColumnStatistics.createIntegerColumnStatistics; import static io.trino.plugin.hive.metastore.MetastoreUtil.computePartitionKeyFilter; +import static io.trino.plugin.hive.metastore.StorageFormat.fromHiveStorageFormat; import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.cachingHiveMetastore; import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.memoizeMetastore; import static io.trino.plugin.hive.metastore.thrift.MockThriftMetastoreClient.BAD_DATABASE; @@ -75,7 +93,9 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.testing.TestingConnectorSession.SESSION; import static java.util.concurrent.Executors.newCachedThreadPool; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.function.Function.identity; +import static org.apache.hadoop.hive.metastore.TableType.EXTERNAL_TABLE; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -85,6 +105,8 @@ @Test(singleThreaded = true) public class TestCachingHiveMetastore { + private static final Logger log = Logger.get(TestCachingHiveMetastore.class); + private static final HiveIdentity IDENTITY = new HiveIdentity(SESSION); private static final PartitionStatistics TEST_STATS = PartitionStatistics.builder() .setColumnStatistics(ImmutableMap.of(TEST_COLUMN, createIntegerColumnStatistics(OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty(), OptionalLong.empty()))) @@ -468,6 +490,182 @@ public void testCachingHiveMetastoreCreationViaMemoize() assertEquals(metastore.getDatabaseNamesStats().getRequestCount(), 0); } + @Test(timeOut = 60_000, dataProviderClass = DataProviders.class, dataProvider = "trueFalse") + public void testLoadAfterInvalidate(boolean invalidateAll) + throws Exception + { + // State + CopyOnWriteArrayList tableColumns = new CopyOnWriteArrayList<>(); + ConcurrentMap tablePartitionsByName = new ConcurrentHashMap<>(); + Map tableParameters = new ConcurrentHashMap<>(); + tableParameters.put("frequent-changing-table-parameter", "parameter initial value"); + + // Initialize data + String databaseName = "my_database"; + String tableName = "my_table_name"; + + tableColumns.add(new Column("value", toHiveType(VARCHAR), Optional.empty() /* comment */)); + tableColumns.add(new Column("pk", toHiveType(VARCHAR), Optional.empty() /* comment */)); + + List partitionNames = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String partitionName = "pk=" + i; + tablePartitionsByName.put( + partitionName, + Partition.builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setColumns(ImmutableList.copyOf(tableColumns)) + .setValues(List.of(Integer.toString(i))) + .withStorage(storage -> storage.setStorageFormat(fromHiveStorageFormat(TEXTFILE))) + .setParameters(Map.of("frequent-changing-partition-parameter", "parameter initial value")) + .build()); + partitionNames.add(partitionName); + } + + // Mock metastore + CountDownLatch getTableEnteredLatch = new CountDownLatch(1); + CountDownLatch getTableReturnLatch = new CountDownLatch(1); + CountDownLatch getTableFinishedLatch = new CountDownLatch(1); + CountDownLatch getPartitionsByNamesEnteredLatch = new CountDownLatch(1); + CountDownLatch getPartitionsByNamesReturnLatch = new CountDownLatch(1); + CountDownLatch getPartitionsByNamesFinishedLatch = new CountDownLatch(1); + + HiveMetastore mockMetastore = new UnimplementedHiveMetastore() + { + @Override + public Optional getTable(HiveIdentity identity, String databaseName, String tableName) + { + Optional
table = Optional.of(Table.builder() + .setDatabaseName(databaseName) + .setTableName(tableName) + .setTableType(EXTERNAL_TABLE.name()) + .setDataColumns(tableColumns) + .setParameters(ImmutableMap.copyOf(tableParameters)) + // Required by 'Table', but not used by view translation. + .withStorage(storage -> storage.setStorageFormat(fromHiveStorageFormat(TEXTFILE))) + .setOwner(Optional.empty()) + .build()); + + getTableEnteredLatch.countDown(); // 1 + await(getTableReturnLatch, 10, SECONDS); // 2 + + return table; + } + + @Override + public Map> getPartitionsByNames(HiveIdentity identity, Table table, List partitionNames) + { + Map> result = new HashMap<>(); + for (String partitionName : partitionNames) { + result.put(partitionName, Optional.ofNullable(tablePartitionsByName.get(partitionName))); + } + + getPartitionsByNamesEnteredLatch.countDown(); // loader#1 + await(getPartitionsByNamesReturnLatch, 10, SECONDS); // loader#2 + + return result; + } + + @Override + public boolean isImpersonationEnabled() + { + return false; + } + }; + + // Caching metastore + metastore = cachingHiveMetastore( + mockMetastore, + executor, + new Duration(5, TimeUnit.MINUTES), + Optional.of(new Duration(1, TimeUnit.MINUTES)), + 1000); + + // The test. Main thread does modifications and verifies subsequent load sees them. Background thread loads the state into the cache. + ExecutorService executor = Executors.newFixedThreadPool(1); + try { + Future future = executor.submit(() -> { + try { + Table table; + + table = metastore.getTable(IDENTITY, databaseName, tableName).orElseThrow(); + getTableFinishedLatch.countDown(); // 3 + + metastore.getPartitionsByNames(IDENTITY, table, partitionNames); + getPartitionsByNamesFinishedLatch.countDown(); // 6 + + return (Void) null; + } + catch (Throwable e) { + log.error(e); + throw e; + } + }); + + await(getTableEnteredLatch, 10, SECONDS); // 21 + tableParameters.put("frequent-changing-table-parameter", "main-thread-put-xyz"); + if (invalidateAll) { + metastore.flushCache(); + } + else { + metastore.invalidateTable(databaseName, tableName); + } + getTableReturnLatch.countDown(); // 2 + await(getTableFinishedLatch, 10, SECONDS); // 3 + Table table = metastore.getTable(IDENTITY, databaseName, tableName).orElseThrow(); + assertThat(table.getParameters()) + .isEqualTo(Map.of("frequent-changing-table-parameter", "main-thread-put-xyz")); + + await(getPartitionsByNamesEnteredLatch, 10, SECONDS); // 4 + String partitionName = partitionNames.get(2); + Map newPartitionParameters = Map.of("frequent-changing-partition-parameter", "main-thread-put-alice"); + tablePartitionsByName.put(partitionName, + Partition.builder(tablePartitionsByName.get(partitionName)) + .setParameters(newPartitionParameters) + .build()); + if (invalidateAll) { + metastore.flushCache(); + } + else { + metastore.invalidateTable(databaseName, tableName); + } + getPartitionsByNamesReturnLatch.countDown(); // 5 + await(getPartitionsByNamesFinishedLatch, 10, SECONDS); // 6 + Map> loadedPartitions = metastore.getPartitionsByNames(IDENTITY, table, partitionNames); + assertThat(loadedPartitions.get(partitionName)) + .isNotNull() + .isPresent() + .hasValueSatisfying(partition -> assertThat(partition.getParameters()).isEqualTo(newPartitionParameters)); + + // verify no failure in the background thread + future.get(10, SECONDS); + } + finally { + getTableEnteredLatch.countDown(); + getTableReturnLatch.countDown(); + getTableFinishedLatch.countDown(); + getPartitionsByNamesEnteredLatch.countDown(); + getPartitionsByNamesReturnLatch.countDown(); + getPartitionsByNamesFinishedLatch.countDown(); + + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + } + + private static void await(CountDownLatch latch, long timeout, TimeUnit unit) + { + try { + boolean awaited = latch.await(timeout, unit); + checkState(awaited, "wait timed out"); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(); + } + } + private CachingHiveMetastore createMetastoreWithDirectExecutor(CachingHiveMetastoreConfig config) { return (CachingHiveMetastore) cachingHiveMetastore(