From b72d9cb264f8e23cff13f724eee97e36f6325a61 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sat, 2 Jan 2021 23:23:26 -0800 Subject: [PATCH] Return refresh future and ignore redundant refreshes A mapping of in-flight refreshes is now maintained and lazily initialized if not used. This allows the cache to ignore redundant requests for reloads, like Guava does. It also removes disablement of expiration during refresh and resolves an ABA problem if the entry is modified in a previously undectectable way. The refresh future can now be obtained from LoadingCache to chain operations against. fixes #143 fixes #193 fixes #236 fixes #282 fixes #322 fixed #373 fixes #467 --- .../caffeine/cache/BoundedLocalCache.java | 131 ++++++++++++------ .../benmanes/caffeine/cache/LoadingCache.java | 7 +- .../cache/LocalAsyncLoadingCache.java | 109 ++++++++++++--- .../benmanes/caffeine/cache/LocalCache.java | 7 + .../caffeine/cache/LocalLoadingCache.java | 99 ++++++++----- .../caffeine/cache/UnboundedLocalCache.java | 47 ++++++- .../caffeine/cache/ExpirationTest.java | 6 +- .../caffeine/cache/LoadingCacheTest.java | 49 +++++-- .../caffeine/cache/MultiThreadedTest.java | 4 +- .../caffeine/cache/ReferenceTest.java | 4 +- .../caffeine/cache/RefreshAfterWriteTest.java | 13 +- .../benmanes/caffeine/cache/Stresser.java | 1 + .../caffeine/cache/issues/Issue193Test.java | 106 ++++++++++++++ .../cache/testing/GuavaCacheFromContext.java | 29 +++- .../caffeine/cache/testing/HasStats.java | 13 +- 15 files changed, 495 insertions(+), 130 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 59c557ce23..0a73697373 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -60,6 +60,7 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -220,10 +221,10 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef final Executor executor; final boolean isAsync; - // The collection views - @Nullable transient Set keySet; - @Nullable transient Collection values; - @Nullable transient Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine builder, executor = builder.getExecutor(); writer = builder.getCacheWriter(); evictionLock = new ReentrantLock(); + refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); drainBuffersTask = new PerformCleanupTask(this); nodeFactory = NodeFactory.newFactory(builder, isAsync); @@ -288,11 +290,29 @@ public final Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); } + @Override + public Object referenceKey(K key) { + return nodeFactory.newLookupKey(key); + } + /* --------------- Stats Support --------------- */ @Override @@ -899,8 +919,9 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { boolean[] removed = new boolean[1]; boolean[] resurrect = new boolean[1]; RemovalCause[] actualCause = new RemovalCause[1]; + Object keyReference = node.getKeyReference(); - data.computeIfPresent(node.getKeyReference(), (k, n) -> { + data.computeIfPresent(keyReference, (k, n) -> { if (n != node) { return n; } @@ -965,6 +986,12 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(keyReference); + } + if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1172,51 +1199,60 @@ void refreshIfNeeded(Node node, long now) { if (!refreshAfterWrite()) { return; } + K key; V oldValue; - long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = (now + ASYNC_EXPIRY); - if (((now - oldWriteTime) > refreshAfterWriteNanos()) + long writeTime = node.getWriteTime(); + Object keyReference = node.getKeyReference(); + if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) - && node.casWriteTime(oldWriteTime, refreshWriteTime)) { - try { - CompletableFuture refreshFuture; - long startTime = statsTicker().read(); + && !refreshes().containsKey(keyReference)) { + long[] startTime = new long[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] refreshFuture = new CompletableFuture[1]; + refreshes().computeIfAbsent(keyReference, k -> { + startTime[0] = statsTicker().read(); if (isAsync) { @SuppressWarnings("unchecked") CompletableFuture future = (CompletableFuture) oldValue; if (Async.isReady(future)) { @SuppressWarnings("NullAway") - CompletableFuture refresh = future.thenCompose(value -> - cacheLoader.asyncReload(key, value, executor)); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, future.join(), executor); + refreshFuture[0] = refresh; } else { // no-op if load is pending - node.casWriteTime(refreshWriteTime, oldWriteTime); - return; + return future; } } else { @SuppressWarnings("NullAway") - CompletableFuture refresh = cacheLoader.asyncReload(key, oldValue, executor); - refreshFuture = refresh; + var refresh = cacheLoader.asyncReload(key, oldValue, executor); + refreshFuture[0] = refresh; } - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = statsTicker().read() - startTime; + return refreshFuture[0]; + }); + + if (refreshFuture[0] != null) { + refreshFuture[0].whenComplete((newValue, error) -> { + long loadTime = statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - node.casWriteTime(refreshWriteTime, oldWriteTime); + refreshes().remove(keyReference, refreshFuture[0]); statsCounter().recordLoadFailure(loadTime); return; } @SuppressWarnings("unchecked") - V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue; + V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue; boolean[] discard = new boolean[1]; compute(key, (k, currentValue) -> { if (currentValue == null) { - return value; - } else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) { + if (value == null) { + return null; + } else if (refreshes().get(key) == refreshFuture[0]) { + return value; + } + } else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) { return value; } discard[0] = true; @@ -1231,10 +1267,9 @@ void refreshIfNeeded(Node node, long now) { } else { statsCounter().recordLoadSuccess(loadTime); } + + refreshes().remove(keyReference, refreshFuture[0]); }); - } catch (Throwable t) { - node.casWriteTime(refreshWriteTime, oldWriteTime); - logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t); } } } @@ -1782,8 +1817,12 @@ public void clear() { } // Discard all entries - for (Node node : data.values()) { - removeNode(node, now); + var pending = refreshes.get(); + for (var entry : data.entrySet()) { + removeNode(entry.getValue(), now); + if (pending != null) { + pending.remove(entry.getKey()); + } } // Discard all pending reads @@ -2099,8 +2138,9 @@ public Map getAllPresent(Iterable keys) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> { + data.computeIfPresent(lookupKey, (k, n) -> { synchronized (n) { oldValue[0] = n.getValue(); if (oldValue[0] == null) { @@ -2118,6 +2158,11 @@ public Map getAllPresent(Iterable keys) { }); if (cause[0] != null) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2140,8 +2185,9 @@ public boolean remove(Object key, Object value) { @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; RemovalCause[] cause = new RemovalCause[1]; + Object lookupKey = nodeFactory.newLookupKey(key); - data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> { + data.computeIfPresent(lookupKey, (kR, node) -> { synchronized (node) { oldKey[0] = node.getKey(); oldValue[0] = node.getValue(); @@ -2163,7 +2209,13 @@ public boolean remove(Object key, Object value) { if (removed[0] == null) { return false; - } else if (hasRemovalListener()) { + } + + var pending = refreshes.get(); + if (pending != null) { + pending.remove(lookupKey); + } + if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } afterWrite(new RemovalTask(removed[0])); @@ -2582,15 +2634,14 @@ public void replaceAll(BiFunction function) { if (expiresAfterWrite() || (weightedDifference != 0)) { afterWrite(new UpdateTask(node, weightedDifference)); } else { - if (cause[0] == null) { - if (!isComputingAsync(node)) { - tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); - setAccessTime(node, now[0]); - } - } else if (cause[0] == RemovalCause.COLLECTED) { - scheduleDrainBuffers(); + if ((cause[0] == null) && !isComputingAsync(node)) { + tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]); + setAccessTime(node, now[0]); } afterRead(node, now[0], /* recordHit */ false); + if ((cause[0] != null) && cause[0].wasEvicted()) { + scheduleDrainBuffers(); + } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java index c0db010547..81c37db5a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LoadingCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import org.checkerframework.checker.nullness.qual.NonNull; @@ -101,9 +102,13 @@ public interface LoadingCache extends Cache { * Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache * currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading * is asynchronous by delegating to the default executor. + *

+ * Returns an existing future without doing anything if another thread is currently loading the + * value for {@code key}. * * @param key key with which a value may be associated + * @return the future that is loading the value * @throws NullPointerException if the specified key is null */ - void refresh(@NonNull K key); + CompletableFuture refresh(@NonNull K key); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index b11a7dc500..f9e01979a9 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -129,45 +129,109 @@ public Map getAll(Iterable keys) { } @Override - @SuppressWarnings("FutureReturnValueIgnored") - public void refresh(K key) { + public CompletableFuture refresh(K key) { requireNonNull(key); - long[] writeTime = new long[1]; - CompletableFuture oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, writeTime); + Object keyReference = asyncCache.cache().referenceKey(key); + for (;;) { + var future = tryOptimisticRefresh(key, keyReference); + if (future == null) { + future = tryComputeRefresh(key, keyReference); + } + if (future != null) { + return future; + } + } + } + + /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */ + private @Nullable CompletableFuture tryOptimisticRefresh(K key, Object keyReference) { + // If a refresh is in-flight, then return it directly. If completed and not yet removed, then + // remove to trigger a new reload. + @SuppressWarnings("unchecked") + var lastRefresh = (CompletableFuture) asyncCache.cache().refreshes().get(keyReference); + if (lastRefresh != null) { + if (Async.isReady(lastRefresh)) { + asyncCache.cache().refreshes().remove(keyReference, lastRefresh); + } else { + return lastRefresh; + } + } + + // If the entry is absent then perform a new load, else if in-flight then return it + var oldValueFuture = asyncCache.cache().getIfPresentQuietly(key, /* writeTime */ new long[1]); if ((oldValueFuture == null) || (oldValueFuture.isDone() && oldValueFuture.isCompletedExceptionally())) { - asyncCache.get(key, asyncCache.loader::asyncLoad, /* recordStats */ false); - return; + if (oldValueFuture != null) { + asyncCache.cache().remove(key, asyncCache); + } + var future = asyncCache.get(key, + asyncCache.loader::asyncLoad, /* recordStats */ false); + @SuppressWarnings("unchecked") + var prior = (CompletableFuture) asyncCache.cache() + .refreshes().putIfAbsent(keyReference, future); + return (prior == null) ? future : prior; } else if (!oldValueFuture.isDone()) { // no-op if load is pending - return; + return oldValueFuture; } - oldValueFuture.thenAccept(oldValue -> { - long now = asyncCache.cache().statsTicker().read(); - CompletableFuture refreshFuture = (oldValue == null) - ? asyncCache.loader.asyncLoad(key, asyncCache.cache().executor()) - : asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = asyncCache.cache().statsTicker().read() - now; + // Fallback to the slow path, possibly retrying + return null; + } + + /** Begins a refresh if the entry has materialized and no reload is in-flight. */ + @SuppressWarnings("FutureReturnValueIgnored") + private @Nullable CompletableFuture tryComputeRefresh(K key, Object keyReference) { + long[] startTime = new long[1]; + long[] writeTime = new long[1]; + boolean[] refreshed = new boolean[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] oldValueFuture = new CompletableFuture[1]; + var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> { + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + V oldValue = Async.getIfReady(oldValueFuture[0]); + if (oldValue == null) { + return null; + } + + refreshed[0] = true; + startTime[0] = asyncCache.cache().statsTicker().read(); + return asyncCache.loader.asyncReload(key, oldValue, asyncCache.cache().executor()); + }); + + if (future == null) { + // Retry the optimistic path + return null; + } + + @SuppressWarnings("unchecked") + var castedFuture = (CompletableFuture) future; + if (refreshed[0]) { + castedFuture.whenComplete((newValue, error) -> { + long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { - asyncCache.cache().statsCounter().recordLoadFailure(loadTime); logger.log(Level.WARNING, "Exception thrown during refresh", error); + asyncCache.cache().refreshes().remove(keyReference, castedFuture); + asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (k, currentValue) -> { + asyncCache.cache().compute(key, (ignored, currentValue) -> { if (currentValue == null) { - return (newValue == null) ? null : refreshFuture; - } else if (currentValue == oldValueFuture) { + if (newValue == null) { + return null; + } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { + return castedFuture; + } + } else if (currentValue == oldValueFuture[0]) { long expectedWriteTime = writeTime[0]; if (asyncCache.cache().hasWriteTime()) { asyncCache.cache().getIfPresentQuietly(key, writeTime); } if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : refreshFuture; + return (newValue == null) ? null : castedFuture; } } discard[0] = true; @@ -175,15 +239,18 @@ public void refresh(K key) { }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, refreshFuture, RemovalCause.REPLACED); + asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } + + asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); - }); + } + return castedFuture; } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index 9ce22216c2..206055808a 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,6 +53,9 @@ interface LocalCache extends ConcurrentMap { /** Returns the {@link Executor} used by this cache. */ @NonNull Executor executor(); + /** Returns the map of in-flight refresh operations. */ + ConcurrentMap> refreshes(); + /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); @@ -64,6 +68,9 @@ interface LocalCache extends ConcurrentMap { /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); + /** Returns the reference key. */ + Object referenceKey(K key); + /** * See {@link Cache#getIfPresent(Object)}. This method differs by accepting a parameter of whether * to record the hit and miss statistics based on the success of this operation. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 5e12bdf397..ccc265f986 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -89,49 +89,78 @@ default Map loadSequentially(Iterable keys) { @Override @SuppressWarnings("FutureReturnValueIgnored") - default void refresh(K key) { + default CompletableFuture refresh(K key) { requireNonNull(key); long[] writeTime = new long[1]; - long startTime = cache().statsTicker().read(); - V oldValue = cache().getIfPresentQuietly(key, writeTime); - CompletableFuture refreshFuture = (oldValue == null) - ? cacheLoader().asyncLoad(key, cache().executor()) - : cacheLoader().asyncReload(key, oldValue, cache().executor()); - refreshFuture.whenComplete((newValue, error) -> { - long loadTime = cache().statsTicker().read() - startTime; - if (error != null) { - logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().statsCounter().recordLoadFailure(loadTime); - return; + long[] startTime = new long[1]; + @SuppressWarnings("unchecked") + V[] oldValue = (V[]) new Object[1]; + @SuppressWarnings({"unchecked", "rawtypes"}) + CompletableFuture[] reloading = new CompletableFuture[1]; + Object keyReference = cache().referenceKey(key); + + var future = cache().refreshes().compute(keyReference, (k, existing) -> { + if ((existing != null) && !Async.isReady(existing)) { + return existing; } - boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - return newValue; - } else if (currentValue == oldValue) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + startTime[0] = cache().statsTicker().read(); + oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + CompletableFuture refreshFuture = (oldValue[0] == null) + ? cacheLoader().asyncLoad(key, cache().executor()) + : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); + reloading[0] = refreshFuture; + return refreshFuture; + }); + + if (reloading[0] != null) { + reloading[0].whenComplete((newValue, error) -> { + long loadTime = cache().statsTicker().read() - startTime[0]; + if (error != null) { + logger.log(Level.WARNING, "Exception thrown during refresh", error); + cache().refreshes().remove(keyReference, reloading[0]); + cache().statsCounter().recordLoadFailure(loadTime); + return; + } + + boolean[] discard = new boolean[1]; + cache().compute(key, (k, currentValue) -> { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (cache().refreshes().get(keyReference) == reloading[0]) { + return newValue; + } + } else if (currentValue == oldValue[0]) { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } + discard[0] = true; + return currentValue; + }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); + + if (discard[0] && cache().hasRemovalListener()) { + cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + } + if (newValue == null) { + cache().statsCounter().recordLoadFailure(loadTime); + } else { + cache().statsCounter().recordLoadSuccess(loadTime); } - discard[0] = true; - return currentValue; - }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); - } - if (newValue == null) { - cache().statsCounter().recordLoadFailure(loadTime); - } else { - cache().statsCounter().recordLoadSuccess(loadTime); - } - }); + cache().refreshes().remove(keyReference, reloading[0]); + }); + } + + @SuppressWarnings("unchecked") + CompletableFuture castedFuture = (CompletableFuture) future; + return castedFuture; } /** Returns a mapping function that adapts to {@link CacheLoader#load}. */ diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 70e12f2fb7..6974bd5770 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -61,15 +62,17 @@ final class UnboundedLocalCache implements LocalCache { final Executor executor; final Ticker ticker; - transient @Nullable Set keySet; - transient @Nullable Collection values; - transient @Nullable Set> entrySet; + @Nullable Set keySet; + @Nullable Collection values; + @Nullable Set> entrySet; + AtomicReference>> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); this.statsCounter = builder.getStatsCounterSupplier().get(); this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); + this.refreshes = new AtomicReference<>(); this.writer = builder.getCacheWriter(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); @@ -80,6 +83,11 @@ public boolean hasWriteTime() { return false; } + @Override + public Object referenceKey(K key) { + return key; + } + /* --------------- Cache --------------- */ @Override @@ -166,6 +174,19 @@ public Executor executor() { return executor; } + @Override + @SuppressWarnings("NullAway") + public ConcurrentMap> refreshes() { + var pending = refreshes.get(); + if (pending == null) { + pending = new ConcurrentHashMap<>(); + if (!refreshes.compareAndSet(null, pending)) { + pending = refreshes.get(); + } + } + return pending; + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -338,6 +359,11 @@ public int size() { public void clear() { if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { data.clear(); + + var pending = refreshes.get(); + if (pending != null) { + pending.clear(); + } return; } for (K key : data.keySet()) { @@ -430,6 +456,10 @@ public void putAll(Map map) { }); } + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -459,9 +489,16 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (hasRemovalListener() && removed) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + if (removed) { + var pending = refreshes.get(); + if (pending != null) { + pending.remove(key); + } + if (hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); + } } + return removed; } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 8852b361f2..f701dfcb76 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -602,9 +602,9 @@ public void getAll_writerFails(LoadingCache cache, CacheContex public void refresh(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.MINUTES); Integer key = context.firstKey(); - cache.refresh(key); + cache.refresh(key).join(); - long count = (cache.estimatedSize() == 1) ? context.initialSize() : 1; + long count = context.initialSize(); assertThat(cache, hasRemovalNotifications(context, count, RemovalCause.EXPIRED)); verifyWriter(context, (verifier, writer) -> { verifier.deleted(key, context.original().get(key), RemovalCause.EXPIRED); @@ -621,7 +621,7 @@ public void refresh(LoadingCache cache, CacheContext context) compute = Compute.SYNC, writer = Writer.EXCEPTIONAL, removalListener = Listener.REJECTING) public void refresh_writerFails(LoadingCache cache, CacheContext context) { context.ticker().advance(1, TimeUnit.HOURS); - cache.refresh(context.firstKey()); + cache.refresh(context.firstKey()).join(); context.disableRejectingCacheWriter(); context.ticker().advance(-1, TimeUnit.HOURS); assertThat(cache.asMap(), equalTo(context.original())); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 920ddf41d9..4f7ed1e1e9 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -27,6 +27,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -56,6 +57,7 @@ import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Maximum; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ReferenceType; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Writer; @@ -361,7 +363,7 @@ class Key { @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) @Test(dataProvider = "caches", expectedExceptions = NullPointerException.class) public void refresh_null(LoadingCache cache, CacheContext context) { - cache.refresh(null); + cache.refresh(null).join(); } @CheckNoWriter @@ -370,7 +372,8 @@ public void refresh_null(LoadingCache cache, CacheContext cont executor = CacheExecutor.DIRECT, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { - cache.refresh(context.firstKey()); + var future = cache.refresh(context.firstKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize() - 1)); assertThat(cache.getIfPresent(context.firstKey()), is(nullValue())); assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); @@ -382,9 +385,11 @@ public void refresh_remove(LoadingCache cache, CacheContext co removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_failure(LoadingCache cache, CacheContext context) { - // Shouldn't leak exception to caller and should retain stale entry - cache.refresh(context.absentKey()); - cache.refresh(context.firstKey()); + // Shouldn't leak exception to caller and should retain the stale entry + var future1 = cache.refresh(context.absentKey()); + var future2 = cache.refresh(context.firstKey()); + assertThat(future1.isCompletedExceptionally(), is(true)); + assertThat(future2.isCompletedExceptionally(), is(true)); assertThat(cache.estimatedSize(), is(context.initialSize())); assertThat(context, both(hasLoadSuccessCount(0)).and(hasLoadFailureCount(2))); } @@ -393,15 +398,23 @@ public void refresh_failure(LoadingCache cache, CacheContext c @CacheSpec(loader = Loader.NULL) @Test(dataProvider = "caches") public void refresh_absent_null(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); assertThat(cache.estimatedSize(), is(context.initialSize())); } @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(removalListener = { Listener.DEFAULT, Listener.REJECTING }) + @CacheSpec( + maximumSize = Maximum.UNREACHABLE, + removalListener = { Listener.DEFAULT, Listener.REJECTING }, population = Population.SINGLETON) public void refresh_absent(LoadingCache cache, CacheContext context) { - cache.refresh(context.absentKey()); + Integer key = context.absentKey(); + var future = cache.refresh(key); + if (cache.asMap().size() == 1) { + assertThat(cache.asMap(), hasKey(key)); + } + assertThat(future.join(), is(not(nullValue()))); assertThat(cache.estimatedSize(), is(1 + context.initialSize())); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); @@ -416,7 +429,8 @@ public void refresh_absent(LoadingCache cache, CacheContext co population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_present_null(LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(nullValue())); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -435,7 +449,8 @@ public void refresh_present_null(LoadingCache cache, CacheCont public void refresh_present_sameValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(context.original().get(key))); } int count = context.firstMiddleLastKeys().size(); assertThat(context, both(hasMissCount(0)).and(hasHitCount(0))); @@ -455,7 +470,9 @@ public void refresh_present_sameValue( public void refresh_present_differentValue( LoadingCache cache, CacheContext context) { for (Integer key : context.firstMiddleLastKeys()) { - cache.refresh(key); + var future = cache.refresh(key); + assertThat(future.join(), is(key)); + // records a hit assertThat(cache.get(key), is(key)); } @@ -481,10 +498,12 @@ public void refresh_conflict(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); assertThat(cache.asMap().put(key, updated), is(original)); refresh.set(true); + future.join(); + await().until(() -> context.consumedNotifications().size(), is(2)); List removed = context.consumedNotifications().stream() .map(RemovalNotification::getValue).collect(toList()); @@ -509,11 +528,13 @@ public void refresh_invalidate(CacheContext context) { }); cache.put(key, original); - cache.refresh(key); + var future = cache.refresh(key); cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + future.join(); + + assertThat(cache.getIfPresent(key), is(nullValue())); await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java index 121899daaf..9c7e2f97b2 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/MultiThreadedTest.java @@ -94,8 +94,8 @@ public void async_concurrent_bounded( Threads.runTest(cache, asyncOperations); } - @SuppressWarnings( - {"unchecked", "rawtypes", "ReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) + @SuppressWarnings({"unchecked", "rawtypes", "ReturnValueIgnored", + "FutureReturnValueIgnored", "SizeGreaterThanOrEqualsZero", "SelfEquals"}) List, Integer>> operations = ImmutableList.of( // LoadingCache (cache, key) -> { cache.get(key); }, diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java index 5dbddf0042..a37b752383 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ReferenceTest.java @@ -465,7 +465,7 @@ public void refresh(LoadingCache cache, CacheContext context) GcFinalization.awaitFullGc(); awaitFullCleanup(cache); - cache.refresh(key); + cache.refresh(key).join(); assertThat(cache.estimatedSize(), is(1L)); assertThat(cache.getIfPresent(key), is(not(value))); @@ -487,7 +487,7 @@ public void refresh_writerFails(LoadingCache cache, CacheConte Integer key = context.firstKey(); context.clear(); GcFinalization.awaitFullGc(); - cache.refresh(key); + cache.refresh(key).join(); context.disableRejectingCacheWriter(); assertThat(cache.asMap().isEmpty(), is(false)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 3c774539d2..9866683068 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -351,7 +351,18 @@ public void invalidate(CacheContext context) { cache.invalidate(key); refresh.set(true); - await().until(() -> cache.getIfPresent(key), is(refreshed)); + var executor = (TrackingExecutor) context.executor(); + await().until(() -> executor.submitted() == executor.completed()); + + if (context.implementation() == Implementation.Guava) { + // Guava does not protect against ABA when the entry was removed by allowing a possibly + // stale value from being inserted. + assertThat(cache.getIfPresent(key), is(refreshed)); + } else { + // Maintain linearizability by discarding the refresh if completing after an explicit removal + assertThat(cache.getIfPresent(key), is(nullValue())); + } + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index bd55c04390..89ae2b128b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -90,6 +90,7 @@ public Stresser() { status(); } + @SuppressWarnings("FutureReturnValueIgnored") public void run() throws InterruptedException { ConcurrentTestHarness.timeTasks(operation.maxThreads, () -> { int index = ThreadLocalRandom.current().nextInt(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java new file mode 100644 index 0000000000..517c876dbb --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue193Test.java @@ -0,0 +1,106 @@ +/* + * Copyright 2021 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.testing.FakeTicker; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFutureTask; +import com.google.common.util.concurrent.testing.TestingExecutors; + +/** + * Issue #193: Invalidate before Refresh completes still stores value + *

+ * When a refresh starts before an invalidate and completes afterwards, the entry is inserted into + * the cache. This breaks linearizability assumptions, as the invalidation may be to ensure that + * the cache does not hold stale data that refresh will have observed in its load. This undesirable + * behavior is also present in Guava, so the stricter handling is an intentional deviation. + * + * @author boschb (Robert Bosch) + */ +public final class Issue193Test { + private final AtomicLong counter = new AtomicLong(0); + private final FakeTicker ticker = new FakeTicker(); + + private ListenableFutureTask loadingTask; + + private final AsyncCacheLoader loader = + (key, exec) -> { + // Fools the cache into thinking there is a future that's not immediately ready. + // (The Cache has optimizations for this that we want to avoid) + loadingTask = ListenableFutureTask.create(counter::getAndIncrement); + var f = new CompletableFuture(); + loadingTask.addListener(() -> { + f.complete(Futures.getUnchecked(loadingTask)); + }, exec); + return f; + }; + + private final String key = Issue193Test.class.getSimpleName(); + + /** This ensures that any outstanding async loading is completed as well */ + private long loadGet(AsyncLoadingCache cache, String key) + throws InterruptedException, ExecutionException { + CompletableFuture future = cache.get(key); + if (!loadingTask.isDone()) { + loadingTask.run(); + } + return future.get(); + } + + @Test + public void invalidateDuringRefreshRemovalCheck() throws Exception { + List removed = new ArrayList<>(); + AsyncLoadingCache cache = + Caffeine.newBuilder() + .ticker(ticker::read) + .executor(TestingExecutors.sameThreadScheduledExecutor()) + .removalListener((key, value, reason) -> removed.add(value)) + .refreshAfterWrite(10, TimeUnit.NANOSECONDS) + .buildAsync(loader); + + // Load so there is an initial value. + assertThat(loadGet(cache, key), is(0L)); + + ticker.advance(11); // Refresh should fire on next access + assertThat(cache.synchronous().getIfPresent(key), is(0L)); // Old value + + cache.synchronous().invalidate(key); // Invalidate key entirely + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // No value in cache (good) + loadingTask.run(); // Completes refresh + + // FIXME: java.lang.AssertionError: Not true that <1> is null + assertThat(cache.synchronous().getIfPresent(key), is(nullValue())); // Value in cache (bad) + + // FIXME: Maybe? This is what I wanted to actually test :) + assertThat(removed, is(List.of(0L, 1L))); // 1L was sent to removalListener anyways + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java index 886e75f19d..135c52c6a7 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/GuavaCacheFromContext.java @@ -27,6 +27,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; @@ -65,6 +66,7 @@ */ @SuppressWarnings("PreferJavaTimeOverload") public final class GuavaCacheFromContext { + private static final ThreadLocal error = new ThreadLocal<>(); private GuavaCacheFromContext() {} @@ -489,8 +491,19 @@ public Map getAll(Iterable keys) { } @Override - public void refresh(K key) { + public CompletableFuture refresh(K key) { + error.set(null); cache.refresh(key); + + var e = error.get(); + if (e == null) { + return CompletableFuture.completedFuture(cache.asMap().get(key)); + } else if (e instanceof CacheMissException) { + return CompletableFuture.completedFuture(null); + } + + error.remove(); + return CompletableFuture.failedFuture(e); } } @@ -542,11 +555,17 @@ static class SingleLoader extends CacheLoader implements Serializabl @Override public V load(K key) throws Exception { - V value = delegate.load(key); - if (value == null) { - throw new CacheMissException(); + try { + error.set(null); + V value = delegate.load(key); + if (value == null) { + throw new CacheMissException(); + } + return value; + } catch (Exception e) { + error.set(e); + throw e; } - return value; } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java index cc4ea9bbd0..82fa17ac80 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasStats.java @@ -39,6 +39,8 @@ private enum StatsType { final long count; final StatsType type; + + boolean describe; DescriptionBuilder desc; private HasStats(StatsType type, long count) { @@ -67,8 +69,14 @@ protected boolean matchesSafely(CacheContext context, Description description) { } } - CacheStats stats = context.stats(); desc = new DescriptionBuilder(description); + boolean matches = matches(context); + describe = true; + return matches; + } + + private boolean matches(CacheContext context) throws AssertionError { + CacheStats stats = context.stats(); switch (type) { case HIT: return awaitStatistic(context, stats::hitCount); @@ -95,6 +103,9 @@ private boolean awaitStatistic(CacheContext context, Callable statistic) { await().pollInSameThread().until(statistic, is(count)); return true; } + if (describe) { + is(count).describeMismatch(statistic.call(), desc.getDescription()); + } return false; } catch (Exception e) { return desc.expectThat(type.name(), statistic, is(count)).matches();