From 39046208f1f96844f8d1bc2984f428af10aca713 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 3 Feb 2022 17:15:37 +0100 Subject: [PATCH] Simplify evictable cache implementation Reusing the token-based approach of `EvictableLoadingCache` allows implementing both evictable `Cache` (previously known as `EvictableCache`) and evictable `LoadingCache` (`EvictableLoadingCache`) in a concise manner. This also introducing a builder for the caches (`EvictableCacheBuilder`), which unlocks flexibility of `CacheBuilder` (like weighted cache entries), while still preventing unsupported usage patterns. For example, registering `removalListener` is not allowed, as removal listener is internally used by the cache implementation. --- lib/trino-collect/pom.xml | 5 + .../trino/collect/cache/EvictableCache.java | 371 +++++++++++------ .../collect/cache/EvictableCacheBuilder.java | 194 +++++++++ .../collect/cache/EvictableLoadingCache.java | 382 ------------------ .../io/trino/collect/cache/MoreFutures.java | 11 - .../collect/cache/TestEvictableCache.java | 81 +++- .../cache/TestEvictableLoadingCache.java | 106 +++-- .../trino/plugin/jdbc/CachingJdbcClient.java | 24 +- .../plugin/jdbc/TestCachingJdbcClient.java | 22 +- .../metastore/cache/CachingHiveMetastore.java | 44 +- .../io/trino/plugin/mongodb/MongoSession.java | 11 +- 11 files changed, 652 insertions(+), 599 deletions(-) create mode 100644 lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java delete mode 100644 lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableLoadingCache.java rename lib/trino-collect/src/{main => test}/java/io/trino/collect/cache/MoreFutures.java (82%) diff --git a/lib/trino-collect/pom.xml b/lib/trino-collect/pom.xml index 21ce82fc9a34..eaae0419457b 100644 --- a/lib/trino-collect/pom.xml +++ b/lib/trino-collect/pom.xml @@ -23,6 +23,11 @@ jsr305 + + com.google.errorprone + error_prone_annotations + + com.google.guava guava diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java index f551998a41ae..ac1d55b96658 100644 --- a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java +++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCache.java @@ -13,184 +13,229 @@ */ package io.trino.collect.cache; -import com.google.common.cache.AbstractCache; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.AbstractLoadingCache; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheStats; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.BiMap; +import com.google.common.collect.HashBiMap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; import org.gaul.modernizer_maven_annotations.SuppressModernizer; import javax.annotation.CheckForNull; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import static io.trino.collect.cache.MoreFutures.getDone; -import static java.lang.System.nanoTime; +import static com.google.common.base.MoreObjects.toStringHelper; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.base.Verify.verify; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; /** - * A {@link Cache} implementation similar to ones produced by {@link CacheBuilder#build()}, but one that does not exhibit - * Guava issue #1881: a cache inspection with - * {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} is guaranteed to return fresh state after - * {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. + * A {@link Cache} and {@link LoadingCache} implementation similar to ones produced by {@link CacheBuilder#build()}, + * but one that does not exhibit Guava issue #1881: + * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object, Callable)} is guaranteed to return + * fresh state after {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. + * + * @see EvictableCacheBuilder */ -public class EvictableCache - extends AbstractCache - implements Cache +class EvictableCache + extends AbstractLoadingCache + implements LoadingCache { - /** - * @apiNote Piggy-back on {@link CacheBuilder} for cache TTL. - */ - public static EvictableCache buildWith(CacheBuilder cacheBuilder) - { - return new EvictableCache<>(cacheBuilder); - } - - // private final Map> map = new ConcurrentHashMap<>(); - private final Cache> delegate; - - private final StatsCounter statsCounter = new SimpleStatsCounter(); - - private EvictableCache(CacheBuilder cacheBuilder) + // Invariant: for every (K, token) entry in the tokens map, there is a live + // cache entry (token, ?) in dataCache, that, upon eviction, will cause the tokens' + // entry to be removed. + private final ConcurrentHashMap> tokens = new ConcurrentHashMap<>(); + // The dataCache can have entries with no corresponding tokens in the tokens map. + // For example, this can happen when invalidation concurs with load. + // The dataCache must be bounded. + private final LoadingCache, V> dataCache; + + EvictableCache(CacheBuilder, ? super V> cacheBuilder, CacheLoader cacheLoader) { - requireNonNull(cacheBuilder, "cacheBuilder is null"); - this.delegate = buildUnsafeCache(cacheBuilder); + dataCache = buildUnsafeCache( + cacheBuilder + ., V>removalListener(removal -> { + Token token = removal.getKey(); + verify(token != null, "token is null"); + tokens.remove(token.getKey(), token); + }), + new TokenCacheLoader<>(cacheLoader)); } - @SuppressModernizer // CacheBuilder.build() is forbidden, advising to use this class as a safety-adding wrapper. - private static Cache buildUnsafeCache(CacheBuilder cacheBuilder) + @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, advising to use this class as a safety-adding wrapper. + private static LoadingCache buildUnsafeCache(CacheBuilder cacheBuilder, CacheLoader cacheLoader) { - return cacheBuilder.build(); + return cacheBuilder.build(cacheLoader); } @CheckForNull @Override public V getIfPresent(Object key) { - Future future = delegate.getIfPresent(key); - if (future != null && future.isDone()) { - statsCounter.recordHits(1); - return getDone(future); + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token token = tokens.get(key); + if (token == null) { + return null; } - statsCounter.recordMisses(1); - return null; + return dataCache.getIfPresent(token); } @Override - public V get(K key, Callable loader) + public V get(K key, Callable valueLoader) throws ExecutionException { - requireNonNull(key, "key is null"); - requireNonNull(loader, "loader is null"); - - while (true) { - SettableFuture newFuture = SettableFuture.create(); - Future future = delegate.asMap().computeIfAbsent(key, ignored -> newFuture); - if (future.isDone() && !future.isCancelled()) { - statsCounter.recordHits(1); - return getDone(future); + Token newToken = new Token<>(key); + Token token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + return dataCache.get(token, valueLoader); + } + catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); } + throw e; + } + } - statsCounter.recordMisses(1); - if (future == newFuture) { - // We put the future in. + @Override + public V get(K key) + throws ExecutionException + { + Token newToken = new Token<>(key); + Token token = tokens.computeIfAbsent(key, ignored -> newToken); + try { + return dataCache.get(token); + } + catch (Throwable e) { + if (newToken == token) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(key, newToken); + } + throw e; + } + } - V computed; - long loadStartNanos = nanoTime(); - try { - computed = loader.call(); - requireNonNull(computed, "computed is null"); - } - catch (Exception e) { - statsCounter.recordLoadException(nanoTime() - loadStartNanos); - delegate.asMap().remove(key, newFuture); - // wake up waiters, let them retry - newFuture.cancel(false); - throw new ExecutionException(e); + @Override + public ImmutableMap getAll(Iterable keys) + throws ExecutionException + { + List> newTokens = new ArrayList<>(); + try { + BiMap> keyToToken = HashBiMap.create(); + for (K key : keys) { + // This is not bulk, but is fast local operation + Token newToken = new Token<>(key); + Token token = tokens.computeIfAbsent(key, ignored -> newToken); + keyToToken.put(key, token); + if (token == newToken) { + newTokens.add(newToken); } - statsCounter.recordLoadSuccess(nanoTime() - loadStartNanos); - newFuture.set(computed); - return computed; } - // Someone else is loading the key, let's wait. - try { - return future.get(); - } - catch (CancellationException e) { - // Invalidated, or load failed - } - catch (ExecutionException e) { - // Should never happen - throw new IllegalStateException("Future unexpectedly completed with exception", e); + Map, V> values = dataCache.getAll(keyToToken.values()); + + BiMap, K> tokenToKey = keyToToken.inverse(); + ImmutableMap.Builder result = ImmutableMap.builder(); + for (Map.Entry, V> entry : values.entrySet()) { + Token token = entry.getKey(); + + // While token.getKey() returns equal key, a caller may expect us to maintain key identity, in case equal keys are still distinguishable. + K key = tokenToKey.get(token); + checkState(key != null, "No key found for %s in %s when loading %s", token, tokenToKey, keys); + + result.put(key, entry.getValue()); } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted", e); + return result.buildOrThrow(); + } + catch (Throwable e) { + for (Token token : newTokens) { + // Failed to load and it was our new token persisted in tokens map. + // No cache entry exists for the token (unless concurrent load happened), + // so we need to remove it. + tokens.remove(token.getKey(), token); } - - // Someone else was loading the key, but the load was invalidated. + throw e; } } @Override - public void put(K key, V value) + public void refresh(K key) { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead."); + // The refresh loads a new entry, if it wasn't in the cache yet. Thus, we would create a new Token. + // However, dataCache.refresh is asynchronous and may fail, so no cache entry may be created. + // In such case we would leak the newly created token. + throw new UnsupportedOperationException(); } @Override - public void invalidate(Object key) + public long size() { - delegate.invalidate(key); + return dataCache.size(); } @Override - public void invalidateAll(Iterable keys) + public void cleanUp() { - delegate.invalidateAll(keys); + dataCache.cleanUp(); + } + + @VisibleForTesting + int tokensCount() + { + return tokens.size(); } @Override - public void invalidateAll() + public void invalidate(Object key) { - delegate.invalidateAll(); + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token token = tokens.remove(key); + if (token != null) { + dataCache.invalidate(token); + } } @Override - public long size() + public void invalidateAll() { - // Includes entries being computed. Approximate, as allowed per method contract. - return delegate.size(); + dataCache.invalidateAll(); + tokens.clear(); } @Override public CacheStats stats() { - return statsCounter.snapshot().plus( - new CacheStats( - 0, - 0, - 0, - 0, - 0, - delegate.stats().evictionCount())); + return dataCache.stats(); } @Override public ConcurrentMap asMap() { - ConcurrentMap> delegate = this.delegate.asMap(); return new ConcurrentMap() { + private final ConcurrentMap, V> dataCacheMap = dataCache.asMap(); + @Override public V putIfAbsent(K key, V value) { @@ -200,14 +245,22 @@ public V putIfAbsent(K key, V value) @Override public boolean remove(Object key, Object value) { - // We could use delegate.compute(key, ..) to check existence and remove, but compute takes `K key` and we have `Object` - throw new UnsupportedOperationException(); + @SuppressWarnings("SuspiciousMethodCalls") // Object passed to map as key K + Token token = tokens.get(key); + if (token != null) { + return dataCacheMap.remove(token, value); + } + return false; } @Override public boolean replace(K key, V oldValue, V newValue) { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); + Token token = tokens.get(key); + if (token != null) { + return dataCacheMap.replace(token, oldValue, newValue); + } + return false; } @Override @@ -219,30 +272,25 @@ public V replace(K key, V value) @Override public int size() { - return delegate.size(); + return dataCache.asMap().size(); } @Override public boolean isEmpty() { - return delegate.isEmpty(); + return dataCache.asMap().isEmpty(); } @Override public boolean containsKey(Object key) { - return delegate.containsKey(key); + return tokens.containsKey(key); } @Override public boolean containsValue(Object value) { - for (Future future : delegate.values()) { - if (future.isDone() && !future.isCancelled() && Objects.equals(getDone(future), value)) { - return true; - } - } - return false; + return values().contains(value); } @Override @@ -260,9 +308,9 @@ public V put(K key, V value) @Override public V remove(Object key) { - Future future = delegate.remove(key); - if (future != null && future.isDone() && !future.isCancelled()) { - return getDone(future); + Token token = tokens.remove(key); + if (token != null) { + return dataCacheMap.remove(token); } return null; } @@ -276,33 +324,106 @@ public void putAll(Map m) @Override public void clear() { - delegate.clear(); + dataCacheMap.clear(); + tokens.clear(); } @Override public Set keySet() { - return delegate.keySet(); + return tokens.keySet(); } @Override public Collection values() { - // values() should be a view, but also, it has a size and, iterating values shouldn't throw for incomplete futures. - throw new UnsupportedOperationException(); + return dataCacheMap.values(); } @Override - public Set> entrySet() + public Set> entrySet() { throw new UnsupportedOperationException(); } }; } - @Override - public void cleanUp() + // instance-based equality + static final class Token + { + private final K key; + + Token(K key) + { + this.key = requireNonNull(key, "key is null"); + } + + K getKey() + { + return key; + } + + @Override + public String toString() + { + return format("CacheToken(%s; %s)", Integer.toHexString(hashCode()), key); + } + } + + private static class TokenCacheLoader + extends CacheLoader, V> { - delegate.cleanUp(); + private final CacheLoader delegate; + + public TokenCacheLoader(CacheLoader delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public V load(Token token) + throws Exception + { + return delegate.load(token.getKey()); + } + + @Override + public ListenableFuture reload(Token token, V oldValue) + throws Exception + { + return delegate.reload(token.getKey(), oldValue); + } + + @Override + public Map, V> loadAll(Iterable> tokens) + throws Exception + { + List> tokenList = ImmutableList.copyOf(tokens); + List keys = new ArrayList<>(); + for (Token token : tokenList) { + keys.add(token.getKey()); + } + Map values = delegate.loadAll(keys); + + ImmutableMap.Builder, V> result = ImmutableMap.builder(); + for (int i = 0; i < tokenList.size(); i++) { + Token token = tokenList.get(i); + K key = keys.get(i); + V value = values.get(key); + // CacheLoader.loadAll is not guaranteed to return values for all the keys + if (value != null) { + result.put(token, value); + } + } + return result.buildOrThrow(); + } + + @Override + public String toString() + { + return toStringHelper(this) + .addValue(delegate) + .toString(); + } } } diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java new file mode 100644 index 000000000000..fbc4dc39be21 --- /dev/null +++ b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableCacheBuilder.java @@ -0,0 +1,194 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.collect.cache; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.cache.Weigher; +import com.google.errorprone.annotations.CheckReturnValue; +import io.trino.collect.cache.EvictableCache.Token; +import org.gaul.modernizer_maven_annotations.SuppressModernizer; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static com.google.common.base.Preconditions.checkState; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * Builder for {@link Cache} and {@link LoadingCache} instances, similar to {@link CacheBuilder}, + * but creating cache implementations that do not exhibit Guava issue #1881: + * a cache inspection with {@link Cache#getIfPresent(Object)} or {@link Cache#get(Object, Callable)} is guaranteed to return + * fresh state after {@link Cache#invalidate(Object)}, {@link Cache#invalidateAll(Iterable)} or {@link Cache#invalidateAll()} were called. + */ +public final class EvictableCacheBuilder +{ + @CheckReturnValue + public static EvictableCacheBuilder newBuilder() + { + return new EvictableCacheBuilder<>(); + } + + private Optional expireAfterWrite = Optional.empty(); + private Optional refreshAfterWrite = Optional.empty(); + private Optional maximumSize = Optional.empty(); + private Optional maximumWeight = Optional.empty(); + private Optional, ? super V>> weigher = Optional.empty(); + private boolean recordStats; + + private EvictableCacheBuilder() {} + + public EvictableCacheBuilder expireAfterWrite(long duration, TimeUnit unit) + { + return expireAfterWrite(toDuration(duration, unit)); + } + + public EvictableCacheBuilder expireAfterWrite(Duration duration) + { + checkState(!this.expireAfterWrite.isPresent(), "expireAfterWrite already set"); + this.expireAfterWrite = Optional.of(duration); + return this; + } + + public EvictableCacheBuilder refreshAfterWrite(long duration, TimeUnit unit) + { + return refreshAfterWrite(toDuration(duration, unit)); + } + + public EvictableCacheBuilder refreshAfterWrite(Duration duration) + { + checkState(!this.refreshAfterWrite.isPresent(), "refreshAfterWrite already set"); + this.refreshAfterWrite = Optional.of(duration); + return this; + } + + public EvictableCacheBuilder maximumSize(long maximumSize) + { + checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + this.maximumSize = Optional.of(maximumSize); + return this; + } + + public EvictableCacheBuilder maximumWeight(long maximumWeight) + { + checkState(!this.maximumWeight.isPresent(), "maximumWeight already set"); + checkState(!this.maximumSize.isPresent(), "maximumSize already set"); + this.maximumWeight = Optional.of(maximumWeight); + return this; + } + + public EvictableCacheBuilder weigher(Weigher weigher) + { + checkState(!this.weigher.isPresent(), "weigher already set"); + @SuppressWarnings("unchecked") // see com.google.common.cache.CacheBuilder.weigher + EvictableCacheBuilder cast = (EvictableCacheBuilder) this; + cast.weigher = Optional.of(new TokenWeigher<>(weigher)); + return cast; + } + + public EvictableCacheBuilder recordStats() + { + recordStats = true; + return this; + } + + @CheckReturnValue + public Cache build() + { + return build(unimplementedCacheLoader()); + } + + @CheckReturnValue + public LoadingCache build(CacheLoader loader) + { + if (cacheDisabled()) { + // Disabled cache is always empty, so doesn't exhibit invalidation problems. + // Avoid overhead of EvictableCache wrapper. + CacheBuilder cacheBuilder = CacheBuilder.newBuilder() + .maximumSize(0) + .expireAfterWrite(0, SECONDS); + if (recordStats) { + cacheBuilder.recordStats(); + } + return buildUnsafeCache(cacheBuilder, loader); + } + + if (!(maximumSize.isPresent() || maximumWeight.isPresent() || expireAfterWrite.isPresent())) { + // EvictableCache invalidation (e.g. invalidateAll) happening concurrently with a load may + // lead to an entry remaining in the cache, without associated token. This would lead to + // a memory leak in an unbounded cache. + throw new IllegalStateException("Unbounded cache is not supported"); + } + + // CacheBuilder is further modified in EvictableCache::new, so cannot be shared between build() calls. + CacheBuilder cacheBuilder = CacheBuilder.newBuilder(); + expireAfterWrite.ifPresent(cacheBuilder::expireAfterWrite); + refreshAfterWrite.ifPresent(cacheBuilder::refreshAfterWrite); + maximumSize.ifPresent(cacheBuilder::maximumSize); + maximumWeight.ifPresent(cacheBuilder::maximumWeight); + weigher.ifPresent(cacheBuilder::weigher); + if (recordStats) { + cacheBuilder.recordStats(); + } + return new EvictableCache<>(cacheBuilder, loader); + } + + private boolean cacheDisabled() + { + return (maximumSize.isPresent() && maximumSize.get() == 0) || + (expireAfterWrite.isPresent() && expireAfterWrite.get().isZero()); + } + + @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, advising to use this class as a safety-adding wrapper. + private static LoadingCache buildUnsafeCache(CacheBuilder cacheBuilder, CacheLoader cacheLoader) + { + return cacheBuilder.build(cacheLoader); + } + + private static CacheLoader unimplementedCacheLoader() + { + return CacheLoader.from(ignored -> { + throw new UnsupportedOperationException(); + }); + } + + private static final class TokenWeigher + implements Weigher, V> + { + private final Weigher delegate; + + public TokenWeigher(Weigher delegate) + { + this.delegate = requireNonNull(delegate, "delegate is null"); + } + + @Override + public int weigh(Token key, V value) + { + return delegate.weigh(key.getKey(), value); + } + } + + private static Duration toDuration(long duration, TimeUnit unit) + { + // Saturated conversion, as in com.google.common.cache.CacheBuilder.toNanosSaturated + return Duration.ofNanos(unit.toNanos(duration)); + } +} diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableLoadingCache.java b/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableLoadingCache.java deleted file mode 100644 index 78d7c89ff112..000000000000 --- a/lib/trino-collect/src/main/java/io/trino/collect/cache/EvictableLoadingCache.java +++ /dev/null @@ -1,382 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.collect.cache; - -import com.google.common.cache.AbstractLoadingCache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.CacheStats; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.BiMap; -import com.google.common.collect.HashBiMap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import org.gaul.modernizer_maven_annotations.SuppressModernizer; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static com.google.common.base.Preconditions.checkState; -import static java.lang.String.format; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -/** - * A {@link LoadingCache} implementation similar to ones produced by {@link CacheBuilder#build(CacheLoader)}, - * but one that does not exhibit Guava issue #1881: - * a cache inspection with {@link #getIfPresent(Object)} or {@link #get(Object)} is guaranteed to return fresh - * state after {@link #invalidate(Object)}, {@link #invalidateAll(Iterable)} or {@link #invalidateAll()} were called. - */ -public class EvictableLoadingCache - extends AbstractLoadingCache -{ - public static LoadingCache build( - OptionalLong expiresAfterWriteMillis, - OptionalLong refreshMillis, - long maximumSize, - boolean recordStats, - CacheLoader cacheLoader) - { - requireNonNull(cacheLoader, "cacheLoader is null"); - - CacheBuilder tokenCache = CacheBuilder.newBuilder(); - CacheBuilder dataCache = CacheBuilder.newBuilder(); - if (expiresAfterWriteMillis.isPresent()) { - tokenCache.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); - dataCache.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); - } - - if (refreshMillis.isPresent() && (!expiresAfterWriteMillis.isPresent() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) { - dataCache.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS); - } - - tokenCache.maximumSize(maximumSize); - dataCache.maximumSize(maximumSize); - - if (recordStats) { - dataCache.recordStats(); - } - - return new EvictableLoadingCache<>( - EvictableCache.buildWith(tokenCache), - buildUnsafeCache(dataCache, new TokenCacheLoader<>(cacheLoader))); - } - - @SuppressModernizer // CacheBuilder.build(CacheLoader) is forbidden, advising to use this class as a safety-adding wrapper. - private static LoadingCache buildUnsafeCache(CacheBuilder cacheBuilder, CacheLoader cacheLoader) - { - return cacheBuilder.build(cacheLoader); - } - - // Token is a freshness marker. tokenCache keeps the fresh token for given key. - // Cache invalidation correctness (read-after-invalidate consistency) relies on tokenCache - // invalidation. - private final EvictableCache> tokensCache; - - // dataCache keeps data for tokens, potentially including stale entries. - // dataCache invalidation is opportunistic. It's necessary for releasing memory, but - // is *not* necessary for correctness (read-after-invalidate consistency). - private final LoadingCache, V> dataCache; - - private EvictableLoadingCache(EvictableCache> tokensCache, LoadingCache, V> dataCache) - { - this.tokensCache = requireNonNull(tokensCache, "tokensCache is null"); - this.dataCache = requireNonNull(dataCache, "dataCache is null"); - } - - @Override - public V get(K key) - throws ExecutionException - { - Token token = tokensCache.get(key, () -> new Token<>(key)); - return dataCache.get(token); - } - - @Override - public ImmutableMap getAll(Iterable keys) - throws ExecutionException - { - BiMap> keyToToken = HashBiMap.create(); - for (K key : keys) { - // This is not bulk, but is fast local operation - keyToToken.put(key, tokensCache.get(key, () -> new Token<>(key))); - } - - Map, V> values = dataCache.getAll(keyToToken.values()); - - BiMap, K> tokenToKey = keyToToken.inverse(); - ImmutableMap.Builder result = ImmutableMap.builder(); - for (Map.Entry, V> entry : values.entrySet()) { - Token token = entry.getKey(); - - // While token.getKey() returns equal key, a caller may expect us to maintain key identity, in case equal keys are still distinguishable. - K key = tokenToKey.get(token); - checkState(key != null, "No key found for %s in %s when loading %s", token, tokenToKey, keys); - - result.put(key, entry.getValue()); - } - return result.buildOrThrow(); - } - - @Override - public V getIfPresent(Object key) - { - Token token = tokensCache.getIfPresent(key); - if (token == null) { - return null; - } - return dataCache.getIfPresent(token); - } - - @Override - public void invalidate(Object key) - { - Token token = tokensCache.getIfPresent(key); - // Invalidate irrespective of whether we got a token, to invalidate result of ongoing token load (instantiation). - tokensCache.invalidate(key); - if (token != null) { - dataCache.invalidate(token); - } - } - - @Override - public void invalidateAll(Iterable keys) - { - ImmutableMap> tokens = tokensCache.getAllPresent(keys); - tokensCache.invalidateAll(keys); - // This is racy with respect to concurrent cache loading. dataCache is not guaranteed not to have entries for keys after invalidateAll(keys) returns. - // Note that dataCache invalidation is not necessary for correctness. Still one can expect that cache invalidation releases memory. - dataCache.invalidateAll(tokens.values()); - } - - @Override - public void invalidateAll() - { - tokensCache.invalidateAll(); - dataCache.invalidateAll(); - } - - @Override - public long size() - { - return dataCache.size(); - } - - @Override - public CacheStats stats() - { - return dataCache.stats(); - } - - @Override - public ConcurrentMap asMap() - { - return new ConcurrentMap() - { - private final ConcurrentMap> tokenCacheMap = tokensCache.asMap(); - private final ConcurrentMap, V> dataCacheMap = dataCache.asMap(); - - @Override - public V putIfAbsent(K key, V value) - { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); - } - - @Override - public boolean remove(Object key, Object value) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean replace(K key, V oldValue, V newValue) - { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); - } - - @Override - public V replace(K key, V value) - { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation"); - } - - @Override - public int size() - { - return dataCache.asMap().size(); - } - - @Override - public boolean isEmpty() - { - return dataCache.asMap().isEmpty(); - } - - @Override - public boolean containsKey(Object key) - { - throw new UnsupportedOperationException(); - } - - @Override - public boolean containsValue(Object value) - { - throw new UnsupportedOperationException(); - } - - @Override - public V get(Object key) - { - return getIfPresent(key); - } - - @Override - public V put(K key, V value) - { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead."); - } - - @Override - public V remove(Object key) - { - throw new UnsupportedOperationException(); - } - - @Override - public void putAll(Map m) - { - throw new UnsupportedOperationException("The operation is not supported, as in inherently races with cache invalidation. Use get(key, callable) instead."); - } - - @Override - public void clear() - { - dataCacheMap.clear(); - tokenCacheMap.clear(); - } - - @Override - public Set keySet() - { - return tokenCacheMap.keySet(); - } - - @Override - public Collection values() - { - return dataCacheMap.values(); - } - - @Override - public Set> entrySet() - { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public void cleanUp() - { - tokensCache.cleanUp(); - dataCache.cleanUp(); - } - - // instance-based equality - private static class Token - { - private final K key; - - Token(K key) - { - this.key = requireNonNull(key, "key is null"); - } - - K getKey() - { - return key; - } - - @Override - public String toString() - { - return format("Cache Token(%s)", key); - } - } - - private static class TokenCacheLoader - extends CacheLoader, V> - { - private final CacheLoader delegate; - - public TokenCacheLoader(CacheLoader delegate) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - } - - @Override - public V load(Token token) - throws Exception - { - return delegate.load(token.getKey()); - } - - @Override - public ListenableFuture reload(Token token, V oldValue) - throws Exception - { - return delegate.reload(token.getKey(), oldValue); - } - - @Override - public Map, V> loadAll(Iterable> tokens) - throws Exception - { - List> tokenList = ImmutableList.copyOf(tokens); - List keys = new ArrayList<>(); - for (Token token : tokenList) { - keys.add(token.getKey()); - } - Map values = delegate.loadAll(keys); - - ImmutableMap.Builder, V> result = ImmutableMap.builder(); - for (int i = 0; i < tokenList.size(); i++) { - Token token = tokenList.get(i); - K key = keys.get(i); - V value = values.get(key); - // CacheLoader.loadAll is not guaranteed to return values for all the keys - if (value != null) { - result.put(token, value); - } - } - return result.buildOrThrow(); - } - - @Override - public String toString() - { - return toStringHelper(this) - .addValue(delegate) - .toString(); - } - } -} diff --git a/lib/trino-collect/src/main/java/io/trino/collect/cache/MoreFutures.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/MoreFutures.java similarity index 82% rename from lib/trino-collect/src/main/java/io/trino/collect/cache/MoreFutures.java rename to lib/trino-collect/src/test/java/io/trino/collect/cache/MoreFutures.java index d8f468ce8786..1afb81fde291 100644 --- a/lib/trino-collect/src/main/java/io/trino/collect/cache/MoreFutures.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/MoreFutures.java @@ -16,7 +16,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Throwables.propagateIfPossible; import static java.util.Objects.requireNonNull; @@ -24,16 +23,6 @@ final class MoreFutures { private MoreFutures() {} - /** - * Copy of {@code io.airlift.concurrent.MoreFutures#getDone}, see there for documentation. - */ - public static T getDone(Future future) - { - requireNonNull(future, "future is null"); - checkArgument(future.isDone(), "future not done yet"); - return getFutureValue(future); - } - /** * Copy of {@code io.airlift.concurrent.MoreFutures#getFutureValue}, see there for documentation. */ diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java index 238acb8d0729..4b568a2dde09 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableCache.java @@ -13,8 +13,8 @@ */ package io.trino.collect.cache; +import com.google.common.base.Strings; import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableList; import org.gaul.modernizer_maven_annotations.SuppressModernizer; @@ -32,9 +32,11 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.collect.cache.CacheStatsAssertions.assertCacheStats; +import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertTrue; @@ -48,17 +50,78 @@ public class TestEvictableCache public void testLoad() throws Exception { - Cache cache = EvictableCache.buildWith( - CacheBuilder.newBuilder()); + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); assertEquals(cache.get(42, () -> "abc"), "abc"); } + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testEvictBySize() + throws Exception + { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(); + + for (int i = 0; i < 10_000; i++) { + int value = i * 10; + assertEquals((Object) cache.get(i, () -> value), value); + } + cache.cleanUp(); + assertEquals(cache.size(), 10); + assertEquals(((EvictableCache) cache).tokensCount(), 10); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testEvictByWeight() + throws Exception + { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumWeight(20) + .weigher((Integer key, String value) -> value.length()) + .build(); + + for (int i = 0; i < 10; i++) { + String value = Strings.repeat("a", i); + assertEquals((Object) cache.get(i, () -> value), value); + } + cache.cleanUp(); + // It's not deterministic which entries get evicted + int cacheSize = toIntExact(cache.size()); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(cacheSize); + assertThat(cache.asMap().keySet()).as("keySet").hasSize(cacheSize); + assertThat(cache.asMap().keySet().stream().mapToInt(i -> i).sum()).as("key sum").isLessThanOrEqualTo(20); + assertThat(cache.asMap().values()).as("values").hasSize(cacheSize); + assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testDisabledCache() + throws Exception + { + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .build(); + + for (int i = 0; i < 10; i++) { + int value = i * 10; + assertEquals((Object) cache.get(i, () -> value), value); + } + cache.cleanUp(); + assertEquals(cache.size(), 0); + assertThat(cache.asMap().keySet()).as("keySet").isEmpty(); + assertThat(cache.asMap().values()).as("values").isEmpty(); + } + @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testLoadStats() throws Exception { - Cache cache = EvictableCache.buildWith( - CacheBuilder.newBuilder()); + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(); assertEquals(cache.stats(), new CacheStats(0, 0, 0, 0, 0, 0)); @@ -97,7 +160,9 @@ private static Integer newInteger(int value) public void testInvalidateOngoingLoad(Invalidation invalidation) throws Exception { - Cache cache = EvictableCache.buildWith(CacheBuilder.newBuilder()); + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); Integer key = 42; CountDownLatch loadOngoing = new CountDownLatch(1); @@ -164,7 +229,9 @@ public void testInvalidateAndLoadConcurrently(Invalidation invalidation) int[] primes = {2, 3, 5, 7}; AtomicLong remoteState = new AtomicLong(1); - Cache cache = EvictableCache.buildWith(CacheBuilder.newBuilder()); + Cache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(); Integer key = 42; int threads = 4; diff --git a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java index 7c87dd3e529d..66d029816a43 100644 --- a/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java +++ b/lib/trino-collect/src/test/java/io/trino/collect/cache/TestEvictableLoadingCache.java @@ -13,6 +13,7 @@ */ package io.trino.collect.cache; +import com.google.common.base.Strings; import com.google.common.cache.CacheLoader; import com.google.common.cache.CacheStats; import com.google.common.cache.LoadingCache; @@ -24,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -39,6 +39,7 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static com.google.common.collect.Iterables.getOnlyElement; import static io.trino.collect.cache.CacheStatsAssertions.assertCacheStats; +import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; @@ -57,26 +58,77 @@ public class TestEvictableLoadingCache public void testLoad() throws Exception { - LoadingCache cache = EvictableLoadingCache.build( - OptionalLong.empty(), - OptionalLong.empty(), - 10_000, - true, - CacheLoader.from((Integer ignored) -> "abc")); + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(CacheLoader.from((Integer ignored) -> "abc")); assertEquals(cache.get(42), "abc"); } + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testEvictBySize() + throws Exception + { + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10) + .build(CacheLoader.from(key -> "abc" + key)); + + for (int i = 0; i < 10_000; i++) { + assertEquals((Object) cache.get(i), "abc" + i); + } + cache.cleanUp(); + assertEquals(cache.size(), 10); + assertEquals(((EvictableCache) cache).tokensCount(), 10); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testEvictByWeight() + throws Exception + { + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumWeight(20) + .weigher((Integer key, String value) -> value.length()) + .build(CacheLoader.from(key -> Strings.repeat("a", key))); + + for (int i = 0; i < 10; i++) { + assertEquals((Object) cache.get(i), Strings.repeat("a", i)); + } + cache.cleanUp(); + // It's not deterministic which entries get evicted + int cacheSize = toIntExact(cache.size()); + assertThat(((EvictableCache) cache).tokensCount()).as("tokensCount").isEqualTo(cacheSize); + assertThat(cache.asMap().keySet()).as("keySet").hasSize(cacheSize); + assertThat(cache.asMap().keySet().stream().mapToInt(i -> i).sum()).as("key sum").isLessThanOrEqualTo(20); + assertThat(cache.asMap().values()).as("values").hasSize(cacheSize); + assertThat(cache.asMap().values().stream().mapToInt(String::length).sum()).as("values length sum").isLessThanOrEqualTo(20); + } + + @Test(timeOut = TEST_TIMEOUT_MILLIS) + public void testDisabledCache() + throws Exception + { + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(0) + .build(CacheLoader.from(key -> key * 10)); + + for (int i = 0; i < 10; i++) { + assertEquals((Object) cache.get(i), i * 10); + } + cache.cleanUp(); + assertEquals(cache.size(), 0); + assertThat(cache.asMap().keySet()).as("keySet").isEmpty(); + assertThat(cache.asMap().values()).as("values").isEmpty(); + } + @Test(timeOut = TEST_TIMEOUT_MILLIS) public void testLoadStats() throws Exception { - LoadingCache cache = EvictableLoadingCache.build( - OptionalLong.empty(), - OptionalLong.empty(), - 10_000, - true, - CacheLoader.from((Integer ignored) -> "abc")); + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(CacheLoader.from((Integer ignored) -> "abc")); assertEquals(cache.stats(), new CacheStats(0, 0, 0, 0, 0, 0)); @@ -116,12 +168,10 @@ private static Integer newInteger(int value) public void testGetAllMaintainsKeyIdentity() throws Exception { - LoadingCache cache = EvictableLoadingCache.build( - OptionalLong.empty(), - OptionalLong.empty(), - 10_000, - true, - CacheLoader.from(String::length)); + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .recordStats() + .build(CacheLoader.from(String::length)); String first = "abc"; String second = new String(first); @@ -155,12 +205,9 @@ public void testInvalidateOngoingLoad(Invalidation invalidation) CountDownLatch invalidated = new CountDownLatch(1); CountDownLatch getReturned = new CountDownLatch(1); - LoadingCache cache = EvictableLoadingCache.build( - OptionalLong.empty(), - OptionalLong.empty(), - 10_000, - false, - new CacheLoader() + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(new CacheLoader() { @Override public String load(Integer key) @@ -234,12 +281,9 @@ public void testInvalidateAndLoadConcurrently(Invalidation invalidation) Map remoteState = new ConcurrentHashMap<>(); remoteState.put(key, new AtomicLong(1)); - LoadingCache cache = EvictableLoadingCache.build( - OptionalLong.empty(), - OptionalLong.empty(), - 10_000, - false, - CacheLoader.from(i -> remoteState.get(i).get())); + LoadingCache cache = EvictableCacheBuilder.newBuilder() + .maximumSize(10_000) + .build(CacheLoader.from(i -> remoteState.get(i).get())); int threads = 4; CyclicBarrier barrier = new CyclicBarrier(threads); diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 105a523ec5aa..fd48e6499af3 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -15,13 +15,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheStats; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.jmx.CacheStatsMBean; import io.airlift.units.Duration; -import io.trino.collect.cache.EvictableCache; +import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.plugin.base.session.SessionPropertiesProvider; import io.trino.plugin.jdbc.IdentityCacheMapping.IdentityCacheKey; import io.trino.spi.TrinoException; @@ -110,7 +109,7 @@ public CachingJdbcClient( this.cacheMissing = cacheMissing; this.identityMapping = requireNonNull(identityMapping, "identityMapping is null"); - CacheBuilder cacheBuilder = CacheBuilder.newBuilder() + EvictableCacheBuilder cacheBuilder = EvictableCacheBuilder.newBuilder() .expireAfterWrite(metadataCachingTtl.toMillis(), MILLISECONDS) .recordStats(); @@ -122,11 +121,11 @@ public CachingJdbcClient( cacheBuilder.maximumSize(cacheMaximumSize); } - schemaNamesCache = EvictableCache.buildWith(cacheBuilder); - tableNamesCache = EvictableCache.buildWith(cacheBuilder); - tableHandleCache = EvictableCache.buildWith(cacheBuilder); - columnsCache = EvictableCache.buildWith(cacheBuilder); - statisticsCache = EvictableCache.buildWith(cacheBuilder); + schemaNamesCache = cacheBuilder.build(); + tableNamesCache = cacheBuilder.build(); + tableHandleCache = cacheBuilder.build(); + columnsCache = cacheBuilder.build(); + statisticsCache = cacheBuilder.build(); } @Override @@ -719,6 +718,15 @@ public int hashCode() { return Objects.hash(tableHandle, tupleDomain); } + + @Override + public String toString() + { + return toStringHelper(this) + .add("tableHandle", tableHandle) + .add("tupleDomain", tupleDomain) + .toString(); + } } @Managed diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java index 157f9f1d3774..16776e6c3ca2 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java @@ -358,7 +358,7 @@ public void testGetTableStatistics() JdbcTableHandle second = createTable(new SchemaTableName(schema, "second")); // load first - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, first, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -368,7 +368,7 @@ public void testGetTableStatistics() }); // load second - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, second, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -382,7 +382,7 @@ public void testGetTableStatistics() JdbcTableHandle secondFirst = createTable(new SchemaTableName(schema, "first")); // load first again - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, secondFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -414,7 +414,7 @@ public void testCacheGetTableStatisticsWithQueryRelationHandle() 0); // load - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, queryOnFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -435,7 +435,7 @@ public void testCacheGetTableStatisticsWithQueryRelationHandle() cachingJdbcClient.dropTable(SESSION, first); // load again - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, queryOnFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); } @@ -449,7 +449,7 @@ public void testTruncateTable() JdbcTableHandle table = createTable(new SchemaTableName(schema, "table")); // load - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -462,7 +462,7 @@ public void testTruncateTable() cachingJdbcClient.truncateTable(SESSION, table); // load again - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -502,7 +502,7 @@ public void testCacheEmptyStatistics() ConnectorSession session = createSession("table"); JdbcTableHandle table = createTable(new SchemaTableName(schema, "table")); - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(TableStatistics.empty()); }); @@ -521,7 +521,7 @@ public void testGetTableStatisticsDoNotCacheEmptyWhenCachingMissingIsDisabled() ConnectorSession session = createSession("table"); JdbcTableHandle table = createTable(new SchemaTableName(schema, "table")); - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, table, TupleDomain.all())).isEqualTo(TableStatistics.empty()); }); @@ -573,7 +573,7 @@ public void testFlushCache() JdbcTableHandle first = createTable(new SchemaTableName(schema, "atable")); // load table - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, first, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); @@ -587,7 +587,7 @@ public void testFlushCache() JdbcTableHandle secondFirst = createTable(new SchemaTableName(schema, "first")); // load table again - assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(2).afterRunning(() -> { + assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { assertThat(cachingJdbcClient.getTableStatistics(session, secondFirst, TupleDomain.all())).isEqualTo(NON_EMPTY_STATS); }); diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java index 62ee49abe612..fdb59a6e8aa6 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/metastore/cache/CachingHiveMetastore.java @@ -23,7 +23,7 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import io.airlift.jmx.CacheStatsMBean; import io.airlift.units.Duration; -import io.trino.collect.cache.EvictableLoadingCache; +import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.plugin.hive.HivePartition; import io.trino.plugin.hive.HiveType; import io.trino.plugin.hive.PartitionNotFoundException; @@ -86,6 +86,7 @@ import static io.trino.plugin.hive.metastore.MetastoreUtil.makePartitionName; import static io.trino.plugin.hive.metastore.PartitionFilter.partitionFilter; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.hadoop.hive.common.FileUtils.makePartName; /** @@ -986,24 +987,24 @@ private static LoadingCache buildCache( Optional refreshExecutor, long maximumSize, StatsRecording statsRecording, - Function loader) + com.google.common.base.Function loader) { - CacheLoader cacheLoader = CacheLoader.from(loader::apply); - + CacheLoader cacheLoader = CacheLoader.from(loader); + EvictableCacheBuilder cacheBuilder = EvictableCacheBuilder.newBuilder(); + if (expiresAfterWriteMillis.isPresent()) { + cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); + } checkArgument(refreshMillis.isEmpty() || refreshExecutor.isPresent(), "refreshMillis is provided but refreshExecutor is not"); if (refreshMillis.isPresent() && (expiresAfterWriteMillis.isEmpty() || expiresAfterWriteMillis.getAsLong() > refreshMillis.getAsLong())) { + cacheBuilder.refreshAfterWrite(refreshMillis.getAsLong(), MILLISECONDS); cacheLoader = asyncReloading(cacheLoader, refreshExecutor.orElseThrow(() -> new IllegalArgumentException("Executor not provided"))); } - else { - refreshMillis = OptionalLong.empty(); + cacheBuilder.maximumSize(maximumSize); + if (statsRecording == StatsRecording.ENABLED) { + cacheBuilder.recordStats(); } - return EvictableLoadingCache.build( - expiresAfterWriteMillis, - refreshMillis, - maximumSize, - statsRecording == StatsRecording.ENABLED, - cacheLoader); + return cacheBuilder.build(cacheLoader); } private static LoadingCache buildCache( @@ -1030,13 +1031,18 @@ public Map loadAll(Iterable keys) } }; - return EvictableLoadingCache.build( - expiresAfterWriteMillis, - // cannot use refreshAfterWrite since it can't use the bulk loading and causes too many requests - OptionalLong.empty(), - maximumSize, - statsRecording == StatsRecording.ENABLED, - cacheLoader); + EvictableCacheBuilder cacheBuilder = EvictableCacheBuilder.newBuilder(); + if (expiresAfterWriteMillis.isPresent()) { + cacheBuilder.expireAfterWrite(expiresAfterWriteMillis.getAsLong(), MILLISECONDS); + } + // cannot use refreshAfterWrite since it can't use the bulk loading and causes too many requests + + cacheBuilder.maximumSize(maximumSize); + if (statsRecording == StatsRecording.ENABLED) { + cacheBuilder.recordStats(); + } + + return cacheBuilder.build(cacheLoader); } // diff --git a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java index 4e733341a62c..f1100bfe3e90 100644 --- a/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java +++ b/plugin/trino-mongodb/src/main/java/io/trino/plugin/mongodb/MongoSession.java @@ -15,13 +15,13 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.primitives.Primitives; import com.google.common.primitives.Shorts; import com.google.common.primitives.SignedBytes; +import com.google.common.util.concurrent.UncheckedExecutionException; import com.mongodb.DBRef; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoClient; @@ -32,7 +32,7 @@ import com.mongodb.client.result.DeleteResult; import io.airlift.log.Logger; import io.airlift.slice.Slice; -import io.trino.collect.cache.EvictableCache; +import io.trino.collect.cache.EvictableCacheBuilder; import io.trino.spi.HostAddress; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; @@ -134,8 +134,9 @@ public MongoSession(TypeManager typeManager, MongoClient client, MongoClientConf this.cursorBatchSize = config.getCursorBatchSize(); this.implicitPrefix = requireNonNull(config.getImplicitRowFieldPrefix(), "config.getImplicitRowFieldPrefix() is null"); - this.tableCache = EvictableCache.buildWith(CacheBuilder.newBuilder() - .expireAfterWrite(1, MINUTES)); // TODO: Configure + this.tableCache = EvictableCacheBuilder.newBuilder() + .expireAfterWrite(1, MINUTES) // TODO: Configure + .build(); } public void shutdown() @@ -178,7 +179,7 @@ public MongoTable getTable(SchemaTableName tableName) try { return tableCache.get(tableName, () -> loadTableSchema(tableName)); } - catch (ExecutionException e) { + catch (ExecutionException | UncheckedExecutionException e) { throwIfInstanceOf(e.getCause(), TrinoException.class); throw new RuntimeException(e); }