diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index a2c24b45f6..8a3b142559 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -62,7 +62,6 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -210,6 +209,8 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1); /** The maximum duration before an entry expires. */ static final long MAXIMUM_EXPIRY = (Long.MAX_VALUE >> 1); // 150 years + /** The handle for the in-flight refresh operations. */ + static final VarHandle REFRESHES; final ConcurrentHashMap> data; @Nullable final CacheLoader cacheLoader; @@ -226,7 +227,7 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef @Nullable Set keySet; @Nullable Collection values; @Nullable Set> entrySet; - AtomicReference>> refreshes; + @Nullable volatile ConcurrentMap> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -236,7 +237,6 @@ protected BoundedLocalCache(Caffeine builder, executor = builder.getExecutor(); writer = builder.getCacheWriter(); evictionLock = new ReentrantLock(); - refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); drainBuffersTask = new PerformCleanupTask(this); nodeFactory = NodeFactory.newFactory(builder, isAsync); @@ -251,6 +251,15 @@ protected BoundedLocalCache(Caffeine builder, } } + static { + try { + REFRESHES = MethodHandles.lookup() + .findVarHandle(BoundedLocalCache.class, "refreshes", ConcurrentMap.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + /* --------------- Shared --------------- */ /** Returns if the node's value is currently being computed, asynchronously. */ @@ -295,16 +304,24 @@ public final Executor executor() { @Override @SuppressWarnings("NullAway") public ConcurrentMap> refreshes() { - var pending = refreshes.get(); + var pending = refreshes; if (pending == null) { pending = new ConcurrentHashMap<>(); - if (!refreshes.compareAndSet(null, pending)) { - pending = refreshes.get(); + if (!REFRESHES.compareAndSet(this, null, pending)) { + pending = refreshes; } } return pending; } + /** Invalidate the in-flight refresh. */ + void discardRefresh(Object keyReference) { + var pending = refreshes; + if (pending != null) { + pending.remove(keyReference); + } + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); @@ -959,6 +976,7 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { } makeDead(n); } + discardRefresh(keyReference); removed[0] = true; return null; }); @@ -988,12 +1006,6 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); - - var pending = refreshes.get(); - if (pending != null) { - pending.remove(keyReference); - } - if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1819,12 +1831,8 @@ public void clear() { } // Discard all entries - var pending = refreshes.get(); for (var entry : data.entrySet()) { removeNode(entry.getValue(), now); - if (pending != null) { - pending.remove(entry.getKey()); - } } // Discard all pending reads @@ -1860,6 +1868,8 @@ void removeNode(Node node, long now) { if (key != null) { writer.delete(key, value[0], cause[0]); } + + discardRefresh(node.getKeyReference()); makeDead(n); return null; } @@ -2035,6 +2045,7 @@ public Map getAllPresent(Iterable keys) { Node computed = node; prior = data.computeIfAbsent(node.getKeyReference(), k -> { writer.write(key, value); + discardRefresh(k); return computed; }); if (prior == node) { @@ -2059,6 +2070,8 @@ public Map getAllPresent(Iterable keys) { afterRead(prior, now, /* recordHit */ false); return currentValue; } + } else { + discardRefresh(prior.getKeyReference()); } V oldValue; @@ -2155,16 +2168,12 @@ public Map getAllPresent(Iterable keys) { writer.delete(castKey, oldValue[0], cause[0]); n.retire(); } + discardRefresh(lookupKey); node[0] = n; return null; }); if (cause[0] != null) { - var pending = refreshes.get(); - if (pending != null) { - pending.remove(lookupKey); - } - afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2203,6 +2212,7 @@ public boolean remove(Object key, Object value) { return node; } writer.delete(oldKey[0], oldValue[0], cause[0]); + discardRefresh(lookupKey); removed[0] = node; node.retire(); return null; @@ -2213,10 +2223,6 @@ public boolean remove(Object key, Object value) { return false; } - var pending = refreshes.get(); - if (pending != null) { - pending.remove(lookupKey); - } if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } @@ -2257,6 +2263,7 @@ public boolean remove(Object key, Object value) { setVariableTime(n, varTime); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(k); return n; } }); @@ -2313,6 +2320,7 @@ public boolean replace(K key, V oldValue, V newValue) { setAccessTime(n, now[0]); setWriteTime(n, now[0]); replaced[0] = true; + discardRefresh(k); } return n; }); @@ -2435,6 +2443,7 @@ public void replaceAll(BiFunction function) { setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0])); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(k); return n; } }); @@ -2591,6 +2600,7 @@ public void replaceAll(BiFunction function) { if (newValue[0] == null) { if (cause[0] == null) { cause[0] = RemovalCause.EXPLICIT; + discardRefresh(kr); } removed[0] = n; n.retire(); @@ -2612,6 +2622,7 @@ public void replaceAll(BiFunction function) { n.setWeight(weight[1]); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(kr); return n; } }); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index f9e01979a9..6da8dee627 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -209,45 +209,46 @@ public CompletableFuture refresh(K key) { var castedFuture = (CompletableFuture) future; if (refreshed[0]) { castedFuture.whenComplete((newValue, error) -> { + boolean removed = asyncCache.cache().refreshes().remove(keyReference, castedFuture); long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - asyncCache.cache().refreshes().remove(keyReference, castedFuture); asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (ignored, currentValue) -> { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { - return castedFuture; - } - } else if (currentValue == oldValueFuture[0]) { - long expectedWriteTime = writeTime[0]; - if (asyncCache.cache().hasWriteTime()) { - asyncCache.cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : castedFuture; + var value = asyncCache.cache().compute(key, (ignored, currentValue) -> { + if (currentValue == oldValueFuture[0]) { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (removed) { + return castedFuture; + } + } else { + long expectedWriteTime = writeTime[0]; + if (asyncCache.cache().hasWriteTime()) { + asyncCache.cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return (newValue == null) ? null : castedFuture; + } } } discard[0] = true; return currentValue; }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); + if (discard[0] && (newValue != null) && asyncCache.cache().hasRemovalListener()) { + var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED; + asyncCache.cache().notifyRemoval(key, castedFuture, cause); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } - - asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); } return castedFuture; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index ccc265f986..c98937078c 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -116,45 +116,46 @@ default CompletableFuture refresh(K key) { if (reloading[0] != null) { reloading[0].whenComplete((newValue, error) -> { + boolean removed = cache().refreshes().remove(keyReference, reloading[0]); long loadTime = cache().statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().refreshes().remove(keyReference, reloading[0]); cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (cache().refreshes().get(keyReference) == reloading[0]) { - return newValue; - } - } else if (currentValue == oldValue[0]) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + var value = cache().compute(key, (k, currentValue) -> { + if (currentValue == oldValue[0]) { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (removed) { + return newValue; + } + } else { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } } discard[0] = true; return currentValue; }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + if (discard[0] && (newValue != null) && cache().hasRemovalListener()) { + var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED; + cache().notifyRemoval(key, newValue, cause); } if (newValue == null) { cache().statsCounter().recordLoadFailure(loadTime); } else { cache().statsCounter().recordLoadSuccess(loadTime); } - - cache().refreshes().remove(keyReference, reloading[0]); }); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 3d957da929..34dc6a9c31 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -24,6 +24,8 @@ import java.io.Serializable; import java.lang.System.Logger; import java.lang.System.Logger.Level; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Collection; @@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -57,6 +58,7 @@ */ final class UnboundedLocalCache implements LocalCache { static final Logger logger = System.getLogger(UnboundedLocalCache.class.getName()); + static final VarHandle REFRESHES; @Nullable final RemovalListener removalListener; final ConcurrentHashMap data; @@ -69,19 +71,27 @@ final class UnboundedLocalCache implements LocalCache { @Nullable Set keySet; @Nullable Collection values; @Nullable Set> entrySet; - AtomicReference>> refreshes; + @Nullable volatile ConcurrentMap> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); this.statsCounter = builder.getStatsCounterSupplier().get(); this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); - this.refreshes = new AtomicReference<>(); this.writer = builder.getCacheWriter(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); } + static { + try { + REFRESHES = MethodHandles.lookup() + .findVarHandle(UnboundedLocalCache.class, "refreshes", ConcurrentMap.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + @Override public boolean hasWriteTime() { return false; @@ -193,16 +203,24 @@ public Executor executor() { @Override @SuppressWarnings("NullAway") public ConcurrentMap> refreshes() { - var pending = refreshes.get(); + var pending = refreshes; if (pending == null) { pending = new ConcurrentHashMap<>(); - if (!refreshes.compareAndSet(null, pending)) { - pending = refreshes.get(); + if (!REFRESHES.compareAndSet(this, null, pending)) { + pending = refreshes; } } return pending; } + /** Invalidate the in-flight refresh. */ + void discardRefresh(Object keyReference) { + var pending = refreshes; + if (pending != null) { + pending.remove(keyReference); + } + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -304,6 +322,7 @@ public V computeIfAbsent(K key, Function mappingFunction oldValue[0] = value; } + discardRefresh(k); return newValue; }); if (oldValue[0] != null) { @@ -351,6 +370,7 @@ V remap(K key, BiFunction remappingFunction) oldValue[0] = value; } + discardRefresh(k); return newValue; }); if (oldValue[0] != null) { @@ -373,13 +393,10 @@ public int size() { @Override public void clear() { - if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { + if (!hasRemovalListener() + && (writer == CacheWriter.disabledWriter() + && ((refreshes == null) || refreshes.isEmpty()))) { data.clear(); - - var pending = refreshes.get(); - if (pending != null) { - pending.clear(); - } return; } for (K key : data.keySet()) { @@ -421,6 +438,7 @@ public boolean containsValue(Object value) { if (value != v) { writer.write(key, value); } + discardRefresh(key); oldValue[0] = v; return value; }); @@ -466,16 +484,13 @@ public void putAll(Map map) { oldValue[0] = data.remove(key); } else { data.computeIfPresent(castKey, (k, v) -> { - writer.delete(castKey, v, RemovalCause.EXPLICIT); + writer.delete(k, v, RemovalCause.EXPLICIT); + discardRefresh(k); oldValue[0] = v; return null; }); } - var pending = refreshes.get(); - if (pending != null) { - pending.remove(key); - } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -497,7 +512,8 @@ public boolean remove(Object key, Object value) { data.computeIfPresent(castKey, (k, v) -> { if (v.equals(value)) { - writer.delete(castKey, v, RemovalCause.EXPLICIT); + writer.delete(k, v, RemovalCause.EXPLICIT); + discardRefresh(k); oldValue[0] = v; return null; } @@ -505,14 +521,8 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (removed) { - var pending = refreshes.get(); - if (pending != null) { - pending.remove(key); - } - if (hasRemovalListener()) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); - } + if (removed && hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } return removed; @@ -528,6 +538,7 @@ public boolean remove(Object key, Object value) { if (value != v) { writer.write(key, value); } + discardRefresh(k); oldValue[0] = v; return value; }); @@ -551,6 +562,7 @@ public boolean replace(K key, V oldValue, V newValue) { writer.write(key, newValue); } prev[0] = v; + discardRefresh(k); return newValue; } return v; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index 2a30a1e848..8eccb85ce1 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -25,6 +25,8 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; @@ -42,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -52,6 +55,8 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; @@ -378,8 +383,7 @@ public void refresh_dedupe(LoadingCache cache, CacheContext co @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(implementation = Implementation.Caffeine, - executor = CacheExecutor.DIRECT, loader = Loader.NULL, + @CacheSpec(implementation = Implementation.Caffeine, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { var future = cache.refresh(context.firstKey()); @@ -389,6 +393,17 @@ public void refresh_remove(LoadingCache cache, CacheContext co assertThat(cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); } + @CheckNoWriter + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, + loader = Loader.NULL, population = Population.EMPTY) + public void refresh_ignored(LoadingCache cache, CacheContext context) { + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); + assertThat(cache.estimatedSize(), is(0L)); + assertThat(context.consumedNotifications(), is(empty())); + } + @CheckNoWriter @Test(dataProvider = "caches") @CacheSpec(executor = CacheExecutor.DIRECT, loader = Loader.EXCEPTIONAL, @@ -544,11 +559,12 @@ public void refresh_conflict(CacheContext context) { @Test(dataProvider = "caches") @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, removalListener = Listener.CONSUMING) - public void refresh_invalidate(CacheContext context) { + public void refresh_put(CacheContext context) { AtomicBoolean refresh = new AtomicBoolean(); Integer key = context.absentKey(); Integer original = 1; Integer refreshed = 2; + Integer updated = 3; LoadingCache cache = context.build(k -> { await().untilTrue(refresh); return refreshed; @@ -556,13 +572,125 @@ public void refresh_invalidate(CacheContext context) { cache.put(key, original); var future = cache.refresh(key); - cache.invalidate(key); + cache.put(key, updated); refresh.set(true); future.join(); + await().until(() -> cache.getIfPresent(key), is(updated)); + await().until(() -> cache, hasRemovalNotifications(context, 2, RemovalCause.REPLACED)); + assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + removalListener = Listener.CONSUMING) + public void refresh_invalidate(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key = context.absentKey(); + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshed; + }); + + cache.put(key, original); + var future = cache.refresh(key); + await().untilTrue(started); + + cache.invalidate(key); + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key), is(either(nullValue()).or(is(refreshed)))); + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key), is(nullValue())); + await().until(() -> cache, hasRemovalNotifications(context, 2, RemovalCause.EXPLICIT)); + } + assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + expireAfterWrite = Expire.ONE_MINUTE, removalListener = Listener.CONSUMING) + public void refresh_expired(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key = context.absentKey(); + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshed; + }); + + cache.put(key, original); + var future = cache.refresh(key); + + await().untilTrue(started); + context.ticker().advance(10, TimeUnit.MINUTES); assertThat(cache.getIfPresent(key), is(nullValue())); - await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); + + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key), is(refreshed)); + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPIRED)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key), is(nullValue())); + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPIRED)); + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); + } + assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + maximumSize = Maximum.ONE, weigher = CacheWeigher.DEFAULT, + removalListener = Listener.CONSUMING) + public void refresh_evicted(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key1 = context.absentKey(); + Integer key2 = key1 + 1; + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().forever().untilTrue(done); + return refreshed; + }); + + cache.put(key1, original); + var future = cache.refresh(key1); + + await().untilTrue(started); + cache.put(key2, original); + await().until(() -> cache.getIfPresent(key1), is(nullValue())); + + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key1), is(refreshed)); + await().until(() -> cache.getIfPresent(key2), is(nullValue())); + await().until(() -> cache, hasRemovalNotifications(context, 2, RemovalCause.SIZE)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key1), is(nullValue())); + await().until(() -> cache.getIfPresent(key2), is(original)); + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.SIZE)); + await().until(() -> cache, hasRemovalNotifications(context, 1, RemovalCause.EXPLICIT)); + } assertThat(context, both(hasLoadSuccessCount(1)).and(hasLoadFailureCount(0))); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index c72bcada70..43ea1827d1 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -24,7 +24,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; @@ -32,7 +31,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -213,17 +211,10 @@ public void get_slowRefresh(CacheContext context) { Integer originalValue = context.absentValue(); AtomicBoolean reloaded = new AtomicBoolean(); AtomicInteger reloading = new AtomicInteger(); - ThreadPoolExecutor executor = (ThreadPoolExecutor) - ((TrackingExecutor) context.executor()).delegate(); - LoadingCache cache = context.build(new CacheLoader() { - @Override public Integer load(Integer key) { - throw new AssertionError(); - } - @Override public Integer reload(Integer key, Integer oldValue) { - int count = reloading.incrementAndGet(); - await().untilTrue(reloaded); - return count; - } + LoadingCache cache = context.build(k -> { + int count = reloading.incrementAndGet(); + await().untilTrue(reloaded); + return count; }); cache.put(key, originalValue); @@ -239,8 +230,7 @@ public void get_slowRefresh(CacheContext context) { reloaded.set(true); await().until(() -> cache.get(key), is(not(originalValue))); - await().until(executor::getQueue, is(empty())); - assertThat(reloading.get(), is(1)); + await().untilAtomic(reloading, is(1)); assertThat(cache.get(key), is(1)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java index 23bb665fbe..2dbd3e3633 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java @@ -23,9 +23,9 @@ import static org.hamcrest.Matchers.nullValue; import java.io.Serializable; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -351,7 +351,7 @@ public RemovalListener removalListener() { return requireNonNull(removalListener); } - public List> consumedNotifications() { + public Collection> consumedNotifications() { return (removalListenerType() == Listener.CONSUMING) ? ((ConsumingRemovalListener) removalListener).evicted() : Collections.emptyList(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasRemovalNotifications.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasRemovalNotifications.java index c11a3ff491..ef49c1871b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasRemovalNotifications.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/HasRemovalNotifications.java @@ -21,7 +21,6 @@ import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; -import java.util.List; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; @@ -61,7 +60,7 @@ protected boolean matchesSafely(Object ignored, Description description) { DescriptionBuilder desc = new DescriptionBuilder(description); if (context.removalListenerType == Listener.CONSUMING) { - List> notifications = context.consumedNotifications(); + var notifications = context.consumedNotifications(); ForkJoinPool.commonPool().awaitQuiescence(10, TimeUnit.SECONDS); int size = 0; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java index c95b9e4c40..9e76a2a48b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java @@ -19,8 +19,9 @@ import java.io.Serializable; import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; import com.github.benmanes.caffeine.cache.RemovalCause; @@ -76,10 +77,10 @@ public static final class ConsumingRemovalListener implements RemovalListener, Serializable { private static final long serialVersionUID = 1L; - private final List> evicted; + private final Queue> evicted; public ConsumingRemovalListener() { - this.evicted = Collections.synchronizedList(new ArrayList<>()); + this.evicted = new ConcurrentLinkedQueue<>(); } @Override @@ -88,8 +89,8 @@ public void onRemoval(K key, V value, RemovalCause cause) { evicted.add(new RemovalNotification<>(key, value, cause)); } - public List> evicted() { - return evicted; + public Collection> evicted() { + return new ArrayList<>(evicted); } } }