From b5fd27cdc09354d06d6484e1fac7985fc91deee6 Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Sun, 10 Jan 2021 13:15:41 -0800 Subject: [PATCH] Improved refresh linearizability (#193) An in-flight refresh is now discarded for any write to the entry, such as an update, removal, or eviction. In the case where the entry was removed and the refresh was discarded, the removal cause is now "explicit" instead of "replaced" as there is no mapping. Guava and previous releases would populate the cache, or at best used "replaced" as the cause even if it was not accurate. The resulting behavior is more pessimistic by trying to avoid a refresh from populating the cache with stale data. In practice this should have little to no penalty. --- .../caffeine/cache/BoundedLocalCache.java | 61 ++++---- .../cache/LocalAsyncLoadingCache.java | 39 ++--- .../caffeine/cache/LocalLoadingCache.java | 39 ++--- .../benmanes/caffeine/cache/RemovalCause.java | 1 + .../caffeine/cache/UnboundedLocalCache.java | 64 ++++---- .../caffeine/cache/LoadingCacheTest.java | 138 +++++++++++++++++- .../caffeine/cache/RefreshAfterWriteTest.java | 31 ++-- .../cache/testing/RemovalListeners.java | 2 +- 8 files changed, 259 insertions(+), 116 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index e7b9996669..5dd70490ad 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -62,7 +62,6 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -211,6 +210,8 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1); /** The maximum duration before an entry expires. */ static final long MAXIMUM_EXPIRY = (Long.MAX_VALUE >> 1); // 150 years + /** The handle for the in-flight refresh operations. */ + static final VarHandle REFRESHES; final ConcurrentHashMap> data; @Nullable final CacheLoader cacheLoader; @@ -227,7 +228,7 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef @Nullable Set keySet; @Nullable Collection values; @Nullable Set> entrySet; - AtomicReference>> refreshes; + @Nullable volatile ConcurrentMap> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -236,7 +237,6 @@ protected BoundedLocalCache(Caffeine builder, this.cacheLoader = cacheLoader; executor = builder.getExecutor(); evictionLock = new ReentrantLock(); - refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); writer = builder.getCacheWriter(isAsync); drainBuffersTask = new PerformCleanupTask(this); @@ -252,6 +252,15 @@ protected BoundedLocalCache(Caffeine builder, } } + static { + try { + REFRESHES = MethodHandles.lookup() + .findVarHandle(BoundedLocalCache.class, "refreshes", ConcurrentMap.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + /* --------------- Shared --------------- */ /** Returns if the node's value is currently being computed, asynchronously. */ @@ -296,16 +305,24 @@ public final Executor executor() { @Override @SuppressWarnings("NullAway") public ConcurrentMap> refreshes() { - var pending = refreshes.get(); + var pending = refreshes; if (pending == null) { pending = new ConcurrentHashMap<>(); - if (!refreshes.compareAndSet(null, pending)) { - pending = refreshes.get(); + if (!REFRESHES.compareAndSet(this, null, pending)) { + pending = refreshes; } } return pending; } + /** Invalidate the in-flight refresh. */ + void discardRefresh(Object keyReference) { + var pending = refreshes; + if (pending != null) { + pending.remove(keyReference); + } + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); @@ -966,6 +983,7 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { writer.delete(key, value[0], actualCause[0]); makeDead(n); } + discardRefresh(keyReference); removed[0] = true; return null; }); @@ -995,12 +1013,6 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); - - var pending = refreshes.get(); - if (pending != null) { - pending.remove(keyReference); - } - if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1848,12 +1860,8 @@ public void clear() { } // Discard all entries - var pending = refreshes.get(); for (var entry : data.entrySet()) { removeNode(entry.getValue(), now); - if (pending != null) { - pending.remove(entry.getKey()); - } } // Discard all pending reads @@ -1889,6 +1897,8 @@ void removeNode(Node node, long now) { if (key != null) { writer.delete(key, value[0], cause[0]); } + + discardRefresh(node.getKeyReference()); makeDead(n); return null; } @@ -2064,6 +2074,7 @@ public Map getAllPresent(Iterable keys) { Node computed = node; prior = data.computeIfAbsent(node.getKeyReference(), k -> { writer.write(key, value); + discardRefresh(k); return computed; }); if (prior == node) { @@ -2088,6 +2099,8 @@ public Map getAllPresent(Iterable keys) { afterRead(prior, now, /* recordHit */ false); return currentValue; } + } else { + discardRefresh(prior.getKeyReference()); } V oldValue; @@ -2184,16 +2197,12 @@ public Map getAllPresent(Iterable keys) { writer.delete(castKey, oldValue[0], cause[0]); n.retire(); } + discardRefresh(lookupKey); node[0] = n; return null; }); if (cause[0] != null) { - var pending = refreshes.get(); - if (pending != null) { - pending.remove(lookupKey); - } - afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2232,6 +2241,7 @@ public boolean remove(Object key, Object value) { return node; } writer.delete(oldKey[0], oldValue[0], cause[0]); + discardRefresh(lookupKey); removed[0] = node; node.retire(); return null; @@ -2242,10 +2252,6 @@ public boolean remove(Object key, Object value) { return false; } - var pending = refreshes.get(); - if (pending != null) { - pending.remove(lookupKey); - } if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } @@ -2286,6 +2292,7 @@ public boolean remove(Object key, Object value) { setVariableTime(n, varTime); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(k); return n; } }); @@ -2342,6 +2349,7 @@ public boolean replace(K key, V oldValue, V newValue) { setAccessTime(n, now[0]); setWriteTime(n, now[0]); replaced[0] = true; + discardRefresh(k); } return n; }); @@ -2464,6 +2472,7 @@ public void replaceAll(BiFunction function) { setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0])); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(k); return n; } }); @@ -2620,6 +2629,7 @@ public void replaceAll(BiFunction function) { if (newValue[0] == null) { if (cause[0] == null) { cause[0] = RemovalCause.EXPLICIT; + discardRefresh(kr); } removed[0] = n; n.retire(); @@ -2641,6 +2651,7 @@ public void replaceAll(BiFunction function) { n.setWeight(weight[1]); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(kr); return n; } }); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index f9e01979a9..6da8dee627 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -209,45 +209,46 @@ public CompletableFuture refresh(K key) { var castedFuture = (CompletableFuture) future; if (refreshed[0]) { castedFuture.whenComplete((newValue, error) -> { + boolean removed = asyncCache.cache().refreshes().remove(keyReference, castedFuture); long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - asyncCache.cache().refreshes().remove(keyReference, castedFuture); asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (ignored, currentValue) -> { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { - return castedFuture; - } - } else if (currentValue == oldValueFuture[0]) { - long expectedWriteTime = writeTime[0]; - if (asyncCache.cache().hasWriteTime()) { - asyncCache.cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : castedFuture; + var value = asyncCache.cache().compute(key, (ignored, currentValue) -> { + if (currentValue == oldValueFuture[0]) { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (removed) { + return castedFuture; + } + } else { + long expectedWriteTime = writeTime[0]; + if (asyncCache.cache().hasWriteTime()) { + asyncCache.cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return (newValue == null) ? null : castedFuture; + } } } discard[0] = true; return currentValue; }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); + if (discard[0] && (newValue != null) && asyncCache.cache().hasRemovalListener()) { + var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED; + asyncCache.cache().notifyRemoval(key, castedFuture, cause); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } - - asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); } return castedFuture; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index ccc265f986..c98937078c 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -116,45 +116,46 @@ default CompletableFuture refresh(K key) { if (reloading[0] != null) { reloading[0].whenComplete((newValue, error) -> { + boolean removed = cache().refreshes().remove(keyReference, reloading[0]); long loadTime = cache().statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().refreshes().remove(keyReference, reloading[0]); cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (cache().refreshes().get(keyReference) == reloading[0]) { - return newValue; - } - } else if (currentValue == oldValue[0]) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + var value = cache().compute(key, (k, currentValue) -> { + if (currentValue == oldValue[0]) { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (removed) { + return newValue; + } + } else { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } } discard[0] = true; return currentValue; }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + if (discard[0] && (newValue != null) && cache().hasRemovalListener()) { + var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED; + cache().notifyRemoval(key, newValue, cause); } if (newValue == null) { cache().statsCounter().recordLoadFailure(loadTime); } else { cache().statsCounter().recordLoadSuccess(loadTime); } - - cache().refreshes().remove(keyReference, reloading[0]); }); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java index f351092796..9cc6862fb5 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java @@ -29,6 +29,7 @@ public enum RemovalCause { *
  • {@link Cache#invalidate}
  • *
  • {@link Cache#invalidateAll(Iterable)}
  • *
  • {@link Cache#invalidateAll()}
  • + *
  • {@link LoadingCache#refresh}
  • *
  • {@link java.util.Map#remove}
  • *
  • {@link java.util.Map#computeIfPresent}
  • *
  • {@link java.util.Map#compute}
  • diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 1e5b87ad2e..449c9edb23 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -24,6 +24,8 @@ import java.io.Serializable; import java.lang.System.Logger; import java.lang.System.Logger.Level; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Collection; @@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -58,6 +59,7 @@ @SuppressWarnings("deprecation") final class UnboundedLocalCache implements LocalCache { static final Logger logger = System.getLogger(UnboundedLocalCache.class.getName()); + static final VarHandle REFRESHES; @Nullable final RemovalListener removalListener; final ConcurrentHashMap data; @@ -70,7 +72,7 @@ final class UnboundedLocalCache implements LocalCache { @Nullable Set keySet; @Nullable Collection values; @Nullable Set> entrySet; - AtomicReference>> refreshes; + @Nullable volatile ConcurrentMap> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); @@ -78,11 +80,19 @@ final class UnboundedLocalCache implements LocalCache { this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); this.writer = builder.getCacheWriter(async); - this.refreshes = new AtomicReference<>(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); } + static { + try { + REFRESHES = MethodHandles.lookup() + .findVarHandle(UnboundedLocalCache.class, "refreshes", ConcurrentMap.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + @Override public boolean hasWriteTime() { return false; @@ -194,16 +204,24 @@ public Executor executor() { @Override @SuppressWarnings("NullAway") public ConcurrentMap> refreshes() { - var pending = refreshes.get(); + var pending = refreshes; if (pending == null) { pending = new ConcurrentHashMap<>(); - if (!refreshes.compareAndSet(null, pending)) { - pending = refreshes.get(); + if (!REFRESHES.compareAndSet(this, null, pending)) { + pending = refreshes; } } return pending; } + /** Invalidate the in-flight refresh. */ + void discardRefresh(Object keyReference) { + var pending = refreshes; + if (pending != null) { + pending.remove(keyReference); + } + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -305,6 +323,7 @@ public V computeIfAbsent(K key, Function mappingFunction oldValue[0] = value; } + discardRefresh(k); return newValue; }); if (oldValue[0] != null) { @@ -352,6 +371,7 @@ V remap(K key, BiFunction remappingFunction) oldValue[0] = value; } + discardRefresh(k); return newValue; }); if (oldValue[0] != null) { @@ -374,13 +394,10 @@ public int size() { @Override public void clear() { - if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { + if (!hasRemovalListener() + && (writer == CacheWriter.disabledWriter() + && ((refreshes == null) || refreshes.isEmpty()))) { data.clear(); - - var pending = refreshes.get(); - if (pending != null) { - pending.clear(); - } return; } for (K key : data.keySet()) { @@ -422,6 +439,7 @@ public boolean containsValue(Object value) { if (value != v) { writer.write(key, value); } + discardRefresh(key); oldValue[0] = v; return value; }); @@ -467,16 +485,13 @@ public void putAll(Map map) { oldValue[0] = data.remove(key); } else { data.computeIfPresent(castKey, (k, v) -> { - writer.delete(castKey, v, RemovalCause.EXPLICIT); + writer.delete(k, v, RemovalCause.EXPLICIT); + discardRefresh(k); oldValue[0] = v; return null; }); } - var pending = refreshes.get(); - if (pending != null) { - pending.remove(key); - } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -498,7 +513,8 @@ public boolean remove(Object key, Object value) { data.computeIfPresent(castKey, (k, v) -> { if (v.equals(value)) { - writer.delete(castKey, v, RemovalCause.EXPLICIT); + writer.delete(k, v, RemovalCause.EXPLICIT); + discardRefresh(k); oldValue[0] = v; return null; } @@ -506,14 +522,8 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (removed) { - var pending = refreshes.get(); - if (pending != null) { - pending.remove(key); - } - if (hasRemovalListener()) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); - } + if (removed && hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } return removed; @@ -529,6 +539,7 @@ public boolean remove(Object key, Object value) { if (value != v) { writer.write(key, value); } + discardRefresh(k); oldValue[0] = v; return value; }); @@ -552,6 +563,7 @@ public boolean replace(K key, V oldValue, V newValue) { writer.write(key, newValue); } prev[0] = v; + discardRefresh(k); return newValue; } return v; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index ac4e54d4b7..8babc88905 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -21,6 +21,8 @@ import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; @@ -38,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -48,6 +51,8 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; @@ -366,8 +371,7 @@ public void refresh_dedupe(LoadingCache cache, CacheContext co @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(implementation = Implementation.Caffeine, - executor = CacheExecutor.DIRECT, loader = Loader.NULL, + @CacheSpec(implementation = Implementation.Caffeine, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { var future = cache.refresh(context.firstKey()); @@ -377,6 +381,17 @@ public void refresh_remove(LoadingCache cache, CacheContext co verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); } + @CheckNoWriter + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, + loader = Loader.NULL, population = Population.EMPTY) + public void refresh_ignored(LoadingCache cache, CacheContext context) { + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); + assertThat(cache.estimatedSize(), is(0L)); + assertThat(context.removalNotifications(), is(empty())); + } + @CheckNoWriter @Test(dataProvider = "caches") @CacheSpec(executor = CacheExecutor.DIRECT, loader = Loader.EXCEPTIONAL, @@ -528,11 +543,12 @@ public void refresh_conflict(CacheContext context) { @Test(dataProvider = "caches") @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, removalListener = Listener.CONSUMING) - public void refresh_invalidate(CacheContext context) { + public void refresh_put(CacheContext context) { AtomicBoolean refresh = new AtomicBoolean(); Integer key = context.absentKey(); Integer original = 1; Integer refreshed = 2; + Integer updated = 3; LoadingCache cache = context.build(k -> { await().untilTrue(refresh); return refreshed; @@ -540,13 +556,125 @@ public void refresh_invalidate(CacheContext context) { cache.put(key, original); var future = cache.refresh(key); - cache.invalidate(key); + cache.put(key, updated); refresh.set(true); future.join(); + await().until(() -> cache.getIfPresent(key), is(updated)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(2, RemovalCause.REPLACED)); + verifyStats(context, verifier -> verifier.success(1).failures(0)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + removalListener = Listener.CONSUMING) + public void refresh_invalidate(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key = context.absentKey(); + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshed; + }); + + cache.put(key, original); + var future = cache.refresh(key); + await().untilTrue(started); + + cache.invalidate(key); + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key), is(either(nullValue()).or(is(refreshed)))); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key), is(nullValue())); + verifyRemovalListener(context, verifier -> verifier.hasOnly(2, RemovalCause.EXPLICIT)); + } + verifyStats(context, verifier -> verifier.success(1).failures(0)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + expireAfterWrite = Expire.ONE_MINUTE, removalListener = Listener.CONSUMING) + public void refresh_expired(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key = context.absentKey(); + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshed; + }); + + cache.put(key, original); + var future = cache.refresh(key); + + await().untilTrue(started); + context.ticker().advance(10, TimeUnit.MINUTES); assertThat(cache.getIfPresent(key), is(nullValue())); - verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); + + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key), is(refreshed)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key), is(nullValue())); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPIRED)); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPLICIT)); + } + verifyStats(context, verifier -> verifier.success(1).failures(0)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + maximumSize = Maximum.ONE, weigher = CacheWeigher.DEFAULT, + removalListener = Listener.CONSUMING) + public void refresh_evicted(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key1 = context.absentKey(); + Integer key2 = key1 + 1; + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().forever().untilTrue(done); + return refreshed; + }); + + cache.put(key1, original); + var future = cache.refresh(key1); + + await().untilTrue(started); + cache.put(key2, original); + await().until(() -> cache.getIfPresent(key1), is(nullValue())); + + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key1), is(refreshed)); + await().until(() -> cache.getIfPresent(key2), is(nullValue())); + verifyRemovalListener(context, verifier -> verifier.hasOnly(2, RemovalCause.SIZE)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key1), is(nullValue())); + await().until(() -> cache.getIfPresent(key2), is(original)); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.SIZE)); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPLICIT)); + } verifyStats(context, verifier -> verifier.success(1).failures(0)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 2f82a0fa70..162fe1d435 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -30,10 +30,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.testng.annotations.Listeners; @@ -209,19 +207,13 @@ public void get_sameFuture(CacheContext context) { public void get_slowRefresh(CacheContext context) { Integer key = context.absentKey(); Integer originalValue = context.absentValue(); - AtomicBoolean reloaded = new AtomicBoolean(); - AtomicInteger reloading = new AtomicInteger(); - ThreadPoolExecutor executor = (ThreadPoolExecutor) - ((TrackingExecutor) context.executor()).delegate(); - LoadingCache cache = context.build(new CacheLoader() { - @Override public Integer load(Integer key) { - throw new AssertionError(); - } - @Override public Integer reload(Integer key, Integer oldValue) { - int count = reloading.incrementAndGet(); - await().untilTrue(reloaded); - return count; - } + Integer refreshedValue = originalValue + 1; + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshedValue; }); cache.put(key, originalValue); @@ -229,17 +221,14 @@ public void get_slowRefresh(CacheContext context) { context.ticker().advance(2, TimeUnit.MINUTES); assertThat(cache.get(key), is(originalValue)); - await().untilAtomic(reloading, is(1)); + await().untilTrue(started); assertThat(cache.getIfPresent(key), is(originalValue)); context.ticker().advance(2, TimeUnit.MINUTES); assertThat(cache.get(key), is(originalValue)); - reloaded.set(true); - await().until(() -> cache.get(key), is(not(originalValue))); - await().until(executor::getQueue, is(empty())); - assertThat(reloading.get(), is(1)); - assertThat(cache.get(key), is(1)); + done.set(true); + await().until(() -> cache.policy().getIfPresentQuietly(key), is(refreshedValue)); } @Test(dataProvider = "caches") diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java index 2c634a17c1..bcaf14d7ae 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java @@ -75,7 +75,7 @@ public static final class ConsumingRemovalListener implements RemovalListener, Serializable { private static final long serialVersionUID = 1L; - private final List> removed; + private final CopyOnWriteArrayList> removed; public ConsumingRemovalListener() { this.removed = new CopyOnWriteArrayList<>();