diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index e7b9996669..5dd70490ad 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -62,7 +62,6 @@ import java.util.concurrent.ForkJoinTask; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -211,6 +210,8 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1); /** The maximum duration before an entry expires. */ static final long MAXIMUM_EXPIRY = (Long.MAX_VALUE >> 1); // 150 years + /** The handle for the in-flight refresh operations. */ + static final VarHandle REFRESHES; final ConcurrentHashMap> data; @Nullable final CacheLoader cacheLoader; @@ -227,7 +228,7 @@ abstract class BoundedLocalCache extends BLCHeader.DrainStatusRef @Nullable Set keySet; @Nullable Collection values; @Nullable Set> entrySet; - AtomicReference>> refreshes; + @Nullable volatile ConcurrentMap> refreshes; /** Creates an instance based on the builder's configuration. */ protected BoundedLocalCache(Caffeine builder, @@ -236,7 +237,6 @@ protected BoundedLocalCache(Caffeine builder, this.cacheLoader = cacheLoader; executor = builder.getExecutor(); evictionLock = new ReentrantLock(); - refreshes = new AtomicReference<>(); weigher = builder.getWeigher(isAsync); writer = builder.getCacheWriter(isAsync); drainBuffersTask = new PerformCleanupTask(this); @@ -252,6 +252,15 @@ protected BoundedLocalCache(Caffeine builder, } } + static { + try { + REFRESHES = MethodHandles.lookup() + .findVarHandle(BoundedLocalCache.class, "refreshes", ConcurrentMap.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + /* --------------- Shared --------------- */ /** Returns if the node's value is currently being computed, asynchronously. */ @@ -296,16 +305,24 @@ public final Executor executor() { @Override @SuppressWarnings("NullAway") public ConcurrentMap> refreshes() { - var pending = refreshes.get(); + var pending = refreshes; if (pending == null) { pending = new ConcurrentHashMap<>(); - if (!refreshes.compareAndSet(null, pending)) { - pending = refreshes.get(); + if (!REFRESHES.compareAndSet(this, null, pending)) { + pending = refreshes; } } return pending; } + /** Invalidate the in-flight refresh. */ + void discardRefresh(Object keyReference) { + var pending = refreshes; + if (pending != null) { + pending.remove(keyReference); + } + } + /** Returns whether this cache notifies a writer when an entry is modified. */ protected boolean hasWriter() { return (writer != CacheWriter.disabledWriter()); @@ -966,6 +983,7 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { writer.delete(key, value[0], actualCause[0]); makeDead(n); } + discardRefresh(keyReference); removed[0] = true; return null; }); @@ -995,12 +1013,6 @@ boolean evictEntry(Node node, RemovalCause cause, long now) { if (removed[0]) { statsCounter().recordEviction(node.getWeight(), actualCause[0]); - - var pending = refreshes.get(); - if (pending != null) { - pending.remove(keyReference); - } - if (hasRemovalListener()) { // Notify the listener only if the entry was evicted. This must be performed as the last // step during eviction to safe guard against the executor rejecting the notification task. @@ -1848,12 +1860,8 @@ public void clear() { } // Discard all entries - var pending = refreshes.get(); for (var entry : data.entrySet()) { removeNode(entry.getValue(), now); - if (pending != null) { - pending.remove(entry.getKey()); - } } // Discard all pending reads @@ -1889,6 +1897,8 @@ void removeNode(Node node, long now) { if (key != null) { writer.delete(key, value[0], cause[0]); } + + discardRefresh(node.getKeyReference()); makeDead(n); return null; } @@ -2064,6 +2074,7 @@ public Map getAllPresent(Iterable keys) { Node computed = node; prior = data.computeIfAbsent(node.getKeyReference(), k -> { writer.write(key, value); + discardRefresh(k); return computed; }); if (prior == node) { @@ -2088,6 +2099,8 @@ public Map getAllPresent(Iterable keys) { afterRead(prior, now, /* recordHit */ false); return currentValue; } + } else { + discardRefresh(prior.getKeyReference()); } V oldValue; @@ -2184,16 +2197,12 @@ public Map getAllPresent(Iterable keys) { writer.delete(castKey, oldValue[0], cause[0]); n.retire(); } + discardRefresh(lookupKey); node[0] = n; return null; }); if (cause[0] != null) { - var pending = refreshes.get(); - if (pending != null) { - pending.remove(lookupKey); - } - afterWrite(new RemovalTask(node[0])); if (hasRemovalListener()) { notifyRemoval(castKey, oldValue[0], cause[0]); @@ -2232,6 +2241,7 @@ public boolean remove(Object key, Object value) { return node; } writer.delete(oldKey[0], oldValue[0], cause[0]); + discardRefresh(lookupKey); removed[0] = node; node.retire(); return null; @@ -2242,10 +2252,6 @@ public boolean remove(Object key, Object value) { return false; } - var pending = refreshes.get(); - if (pending != null) { - pending.remove(lookupKey); - } if (hasRemovalListener()) { notifyRemoval(oldKey[0], oldValue[0], cause[0]); } @@ -2286,6 +2292,7 @@ public boolean remove(Object key, Object value) { setVariableTime(n, varTime); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(k); return n; } }); @@ -2342,6 +2349,7 @@ public boolean replace(K key, V oldValue, V newValue) { setAccessTime(n, now[0]); setWriteTime(n, now[0]); replaced[0] = true; + discardRefresh(k); } return n; }); @@ -2464,6 +2472,7 @@ public void replaceAll(BiFunction function) { setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0])); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(k); return n; } }); @@ -2620,6 +2629,7 @@ public void replaceAll(BiFunction function) { if (newValue[0] == null) { if (cause[0] == null) { cause[0] = RemovalCause.EXPLICIT; + discardRefresh(kr); } removed[0] = n; n.retire(); @@ -2641,6 +2651,7 @@ public void replaceAll(BiFunction function) { n.setWeight(weight[1]); setAccessTime(n, now[0]); setWriteTime(n, now[0]); + discardRefresh(kr); return n; } }); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index f9e01979a9..6da8dee627 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -209,45 +209,46 @@ public CompletableFuture refresh(K key) { var castedFuture = (CompletableFuture) future; if (refreshed[0]) { castedFuture.whenComplete((newValue, error) -> { + boolean removed = asyncCache.cache().refreshes().remove(keyReference, castedFuture); long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - asyncCache.cache().refreshes().remove(keyReference, castedFuture); asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - asyncCache.cache().compute(key, (ignored, currentValue) -> { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (asyncCache.cache().refreshes().get(key) == castedFuture) { - return castedFuture; - } - } else if (currentValue == oldValueFuture[0]) { - long expectedWriteTime = writeTime[0]; - if (asyncCache.cache().hasWriteTime()) { - asyncCache.cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : castedFuture; + var value = asyncCache.cache().compute(key, (ignored, currentValue) -> { + if (currentValue == oldValueFuture[0]) { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (removed) { + return castedFuture; + } + } else { + long expectedWriteTime = writeTime[0]; + if (asyncCache.cache().hasWriteTime()) { + asyncCache.cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return (newValue == null) ? null : castedFuture; + } } } discard[0] = true; return currentValue; }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && asyncCache.cache().hasRemovalListener()) { - asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED); + if (discard[0] && (newValue != null) && asyncCache.cache().hasRemovalListener()) { + var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED; + asyncCache.cache().notifyRemoval(key, castedFuture, cause); } if (newValue == null) { asyncCache.cache().statsCounter().recordLoadFailure(loadTime); } else { asyncCache.cache().statsCounter().recordLoadSuccess(loadTime); } - - asyncCache.cache().refreshes().remove(keyReference, castedFuture); }); } return castedFuture; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index ccc265f986..c98937078c 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -116,45 +116,46 @@ default CompletableFuture refresh(K key) { if (reloading[0] != null) { reloading[0].whenComplete((newValue, error) -> { + boolean removed = cache().refreshes().remove(keyReference, reloading[0]); long loadTime = cache().statsTicker().read() - startTime[0]; if (error != null) { logger.log(Level.WARNING, "Exception thrown during refresh", error); - cache().refreshes().remove(keyReference, reloading[0]); cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; - cache().compute(key, (k, currentValue) -> { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (cache().refreshes().get(keyReference) == reloading[0]) { - return newValue; - } - } else if (currentValue == oldValue[0]) { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; + var value = cache().compute(key, (k, currentValue) -> { + if (currentValue == oldValue[0]) { + if (currentValue == null) { + if (newValue == null) { + return null; + } else if (removed) { + return newValue; + } + } else { + long expectedWriteTime = writeTime[0]; + if (cache().hasWriteTime()) { + cache().getIfPresentQuietly(key, writeTime); + } + if (writeTime[0] == expectedWriteTime) { + return newValue; + } } } discard[0] = true; return currentValue; }, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true); - if (discard[0] && cache().hasRemovalListener()) { - cache().notifyRemoval(key, newValue, RemovalCause.REPLACED); + if (discard[0] && (newValue != null) && cache().hasRemovalListener()) { + var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED; + cache().notifyRemoval(key, newValue, cause); } if (newValue == null) { cache().statsCounter().recordLoadFailure(loadTime); } else { cache().statsCounter().recordLoadSuccess(loadTime); } - - cache().refreshes().remove(keyReference, reloading[0]); }); } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java index f351092796..9cc6862fb5 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/RemovalCause.java @@ -29,6 +29,7 @@ public enum RemovalCause { *
  • {@link Cache#invalidate}
  • *
  • {@link Cache#invalidateAll(Iterable)}
  • *
  • {@link Cache#invalidateAll()}
  • + *
  • {@link LoadingCache#refresh}
  • *
  • {@link java.util.Map#remove}
  • *
  • {@link java.util.Map#computeIfPresent}
  • *
  • {@link java.util.Map#compute}
  • diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 1e5b87ad2e..449c9edb23 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -24,6 +24,8 @@ import java.io.Serializable; import java.lang.System.Logger; import java.lang.System.Logger.Level; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.VarHandle; import java.util.AbstractCollection; import java.util.AbstractSet; import java.util.Collection; @@ -38,7 +40,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -58,6 +59,7 @@ @SuppressWarnings("deprecation") final class UnboundedLocalCache implements LocalCache { static final Logger logger = System.getLogger(UnboundedLocalCache.class.getName()); + static final VarHandle REFRESHES; @Nullable final RemovalListener removalListener; final ConcurrentHashMap data; @@ -70,7 +72,7 @@ final class UnboundedLocalCache implements LocalCache { @Nullable Set keySet; @Nullable Collection values; @Nullable Set> entrySet; - AtomicReference>> refreshes; + @Nullable volatile ConcurrentMap> refreshes; UnboundedLocalCache(Caffeine builder, boolean async) { this.data = new ConcurrentHashMap<>(builder.getInitialCapacity()); @@ -78,11 +80,19 @@ final class UnboundedLocalCache implements LocalCache { this.removalListener = builder.getRemovalListener(async); this.isRecordingStats = builder.isRecordingStats(); this.writer = builder.getCacheWriter(async); - this.refreshes = new AtomicReference<>(); this.executor = builder.getExecutor(); this.ticker = builder.getTicker(); } + static { + try { + REFRESHES = MethodHandles.lookup() + .findVarHandle(UnboundedLocalCache.class, "refreshes", ConcurrentMap.class); + } catch (ReflectiveOperationException e) { + throw new ExceptionInInitializerError(e); + } + } + @Override public boolean hasWriteTime() { return false; @@ -194,16 +204,24 @@ public Executor executor() { @Override @SuppressWarnings("NullAway") public ConcurrentMap> refreshes() { - var pending = refreshes.get(); + var pending = refreshes; if (pending == null) { pending = new ConcurrentHashMap<>(); - if (!refreshes.compareAndSet(null, pending)) { - pending = refreshes.get(); + if (!REFRESHES.compareAndSet(this, null, pending)) { + pending = refreshes; } } return pending; } + /** Invalidate the in-flight refresh. */ + void discardRefresh(Object keyReference) { + var pending = refreshes; + if (pending != null) { + pending.remove(keyReference); + } + } + @Override public Ticker expirationTicker() { return Ticker.disabledTicker(); @@ -305,6 +323,7 @@ public V computeIfAbsent(K key, Function mappingFunction oldValue[0] = value; } + discardRefresh(k); return newValue; }); if (oldValue[0] != null) { @@ -352,6 +371,7 @@ V remap(K key, BiFunction remappingFunction) oldValue[0] = value; } + discardRefresh(k); return newValue; }); if (oldValue[0] != null) { @@ -374,13 +394,10 @@ public int size() { @Override public void clear() { - if (!hasRemovalListener() && (writer == CacheWriter.disabledWriter())) { + if (!hasRemovalListener() + && (writer == CacheWriter.disabledWriter() + && ((refreshes == null) || refreshes.isEmpty()))) { data.clear(); - - var pending = refreshes.get(); - if (pending != null) { - pending.clear(); - } return; } for (K key : data.keySet()) { @@ -422,6 +439,7 @@ public boolean containsValue(Object value) { if (value != v) { writer.write(key, value); } + discardRefresh(key); oldValue[0] = v; return value; }); @@ -467,16 +485,13 @@ public void putAll(Map map) { oldValue[0] = data.remove(key); } else { data.computeIfPresent(castKey, (k, v) -> { - writer.delete(castKey, v, RemovalCause.EXPLICIT); + writer.delete(k, v, RemovalCause.EXPLICIT); + discardRefresh(k); oldValue[0] = v; return null; }); } - var pending = refreshes.get(); - if (pending != null) { - pending.remove(key); - } if (hasRemovalListener() && (oldValue[0] != null)) { notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } @@ -498,7 +513,8 @@ public boolean remove(Object key, Object value) { data.computeIfPresent(castKey, (k, v) -> { if (v.equals(value)) { - writer.delete(castKey, v, RemovalCause.EXPLICIT); + writer.delete(k, v, RemovalCause.EXPLICIT); + discardRefresh(k); oldValue[0] = v; return null; } @@ -506,14 +522,8 @@ public boolean remove(Object key, Object value) { }); boolean removed = (oldValue[0] != null); - if (removed) { - var pending = refreshes.get(); - if (pending != null) { - pending.remove(key); - } - if (hasRemovalListener()) { - notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); - } + if (removed && hasRemovalListener()) { + notifyRemoval(castKey, oldValue[0], RemovalCause.EXPLICIT); } return removed; @@ -529,6 +539,7 @@ public boolean remove(Object key, Object value) { if (value != v) { writer.write(key, value); } + discardRefresh(k); oldValue[0] = v; return value; }); @@ -552,6 +563,7 @@ public boolean replace(K key, V oldValue, V newValue) { writer.write(key, newValue); } prev[0] = v; + discardRefresh(k); return newValue; } return v; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java index ac4e54d4b7..8babc88905 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/LoadingCacheTest.java @@ -21,6 +21,8 @@ import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.either; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.is; @@ -38,6 +40,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; @@ -48,6 +51,8 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Loader; @@ -366,8 +371,7 @@ public void refresh_dedupe(LoadingCache cache, CacheContext co @CheckNoWriter @Test(dataProvider = "caches") - @CacheSpec(implementation = Implementation.Caffeine, - executor = CacheExecutor.DIRECT, loader = Loader.NULL, + @CacheSpec(implementation = Implementation.Caffeine, loader = Loader.NULL, population = { Population.SINGLETON, Population.PARTIAL, Population.FULL }) public void refresh_remove(LoadingCache cache, CacheContext context) { var future = cache.refresh(context.firstKey()); @@ -377,6 +381,17 @@ public void refresh_remove(LoadingCache cache, CacheContext co verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); } + @CheckNoWriter + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, + loader = Loader.NULL, population = Population.EMPTY) + public void refresh_ignored(LoadingCache cache, CacheContext context) { + var future = cache.refresh(context.absentKey()); + assertThat(future.join(), is(nullValue())); + assertThat(cache.estimatedSize(), is(0L)); + assertThat(context.removalNotifications(), is(empty())); + } + @CheckNoWriter @Test(dataProvider = "caches") @CacheSpec(executor = CacheExecutor.DIRECT, loader = Loader.EXCEPTIONAL, @@ -528,11 +543,12 @@ public void refresh_conflict(CacheContext context) { @Test(dataProvider = "caches") @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, removalListener = Listener.CONSUMING) - public void refresh_invalidate(CacheContext context) { + public void refresh_put(CacheContext context) { AtomicBoolean refresh = new AtomicBoolean(); Integer key = context.absentKey(); Integer original = 1; Integer refreshed = 2; + Integer updated = 3; LoadingCache cache = context.build(k -> { await().untilTrue(refresh); return refreshed; @@ -540,13 +556,125 @@ public void refresh_invalidate(CacheContext context) { cache.put(key, original); var future = cache.refresh(key); - cache.invalidate(key); + cache.put(key, updated); refresh.set(true); future.join(); + await().until(() -> cache.getIfPresent(key), is(updated)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(2, RemovalCause.REPLACED)); + verifyStats(context, verifier -> verifier.success(1).failures(0)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + removalListener = Listener.CONSUMING) + public void refresh_invalidate(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key = context.absentKey(); + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshed; + }); + + cache.put(key, original); + var future = cache.refresh(key); + await().untilTrue(started); + + cache.invalidate(key); + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key), is(either(nullValue()).or(is(refreshed)))); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key), is(nullValue())); + verifyRemovalListener(context, verifier -> verifier.hasOnly(2, RemovalCause.EXPLICIT)); + } + verifyStats(context, verifier -> verifier.success(1).failures(0)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + expireAfterWrite = Expire.ONE_MINUTE, removalListener = Listener.CONSUMING) + public void refresh_expired(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key = context.absentKey(); + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshed; + }); + + cache.put(key, original); + var future = cache.refresh(key); + + await().untilTrue(started); + context.ticker().advance(10, TimeUnit.MINUTES); assertThat(cache.getIfPresent(key), is(nullValue())); - verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPLICIT)); + + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key), is(refreshed)); + verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key), is(nullValue())); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPIRED)); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPLICIT)); + } + verifyStats(context, verifier -> verifier.success(1).failures(0)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + maximumSize = Maximum.ONE, weigher = CacheWeigher.DEFAULT, + removalListener = Listener.CONSUMING) + public void refresh_evicted(CacheContext context) { + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + Integer key1 = context.absentKey(); + Integer key2 = key1 + 1; + Integer original = 1; + Integer refreshed = 2; + LoadingCache cache = context.build(k -> { + started.set(true); + await().forever().untilTrue(done); + return refreshed; + }); + + cache.put(key1, original); + var future = cache.refresh(key1); + + await().untilTrue(started); + cache.put(key2, original); + await().until(() -> cache.getIfPresent(key1), is(nullValue())); + + done.set(true); + future.join(); + + if (context.implementation() == Implementation.Guava) { + await().until(() -> cache.getIfPresent(key1), is(refreshed)); + await().until(() -> cache.getIfPresent(key2), is(nullValue())); + verifyRemovalListener(context, verifier -> verifier.hasOnly(2, RemovalCause.SIZE)); + } else { + // linearizable + await().until(() -> cache.getIfPresent(key1), is(nullValue())); + await().until(() -> cache.getIfPresent(key2), is(original)); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.SIZE)); + verifyRemovalListener(context, verifier -> verifier.hasCount(1, RemovalCause.EXPLICIT)); + } verifyStats(context, verifier -> verifier.success(1).failures(0)); } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 2f82a0fa70..162fe1d435 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -30,10 +30,8 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.testng.annotations.Listeners; @@ -209,19 +207,13 @@ public void get_sameFuture(CacheContext context) { public void get_slowRefresh(CacheContext context) { Integer key = context.absentKey(); Integer originalValue = context.absentValue(); - AtomicBoolean reloaded = new AtomicBoolean(); - AtomicInteger reloading = new AtomicInteger(); - ThreadPoolExecutor executor = (ThreadPoolExecutor) - ((TrackingExecutor) context.executor()).delegate(); - LoadingCache cache = context.build(new CacheLoader() { - @Override public Integer load(Integer key) { - throw new AssertionError(); - } - @Override public Integer reload(Integer key, Integer oldValue) { - int count = reloading.incrementAndGet(); - await().untilTrue(reloaded); - return count; - } + Integer refreshedValue = originalValue + 1; + AtomicBoolean started = new AtomicBoolean(); + AtomicBoolean done = new AtomicBoolean(); + LoadingCache cache = context.build(k -> { + started.set(true); + await().untilTrue(done); + return refreshedValue; }); cache.put(key, originalValue); @@ -229,17 +221,14 @@ public void get_slowRefresh(CacheContext context) { context.ticker().advance(2, TimeUnit.MINUTES); assertThat(cache.get(key), is(originalValue)); - await().untilAtomic(reloading, is(1)); + await().untilTrue(started); assertThat(cache.getIfPresent(key), is(originalValue)); context.ticker().advance(2, TimeUnit.MINUTES); assertThat(cache.get(key), is(originalValue)); - reloaded.set(true); - await().until(() -> cache.get(key), is(not(originalValue))); - await().until(executor::getQueue, is(empty())); - assertThat(reloading.get(), is(1)); - assertThat(cache.get(key), is(1)); + done.set(true); + await().until(() -> cache.policy().getIfPresentQuietly(key), is(refreshedValue)); } @Test(dataProvider = "caches") diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java index 2c634a17c1..bcaf14d7ae 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/RemovalListeners.java @@ -75,7 +75,7 @@ public static final class ConsumingRemovalListener implements RemovalListener, Serializable { private static final long serialVersionUID = 1L; - private final List> removed; + private final CopyOnWriteArrayList> removed; public ConsumingRemovalListener() { this.removed = new CopyOnWriteArrayList<>();