diff --git a/.github/workflows/qodana.yml b/.github/workflows/qodana.yml index 503d924005..d4292e6f61 100644 --- a/.github/workflows/qodana.yml +++ b/.github/workflows/qodana.yml @@ -1,5 +1,4 @@ name: Qodana -permissions: read-all on: [ push, pull_request ] env: @@ -11,6 +10,7 @@ jobs: qodana: runs-on: ubuntu-latest permissions: + checks: write actions: read contents: read security-events: write diff --git a/caffeine/build.gradle b/caffeine/build.gradle index 9ad20bf6ea..40712bc509 100644 --- a/caffeine/build.gradle +++ b/caffeine/build.gradle @@ -136,6 +136,15 @@ tasks.register('memoryOverhead', JavaExec) { group = 'Benchmarks' description = 'Evaluates cache overhead' classpath sourceSets.jmh.runtimeClasspath - jvmArgs "-javaagent:${configurations.javaAgent.singleFile}" + classpath sourceSets.codeGen.runtimeClasspath mainClass = 'com.github.benmanes.caffeine.cache.MemoryBenchmark' + jvmArgs += [ + '--add-opens', 'java.base/java.util.concurrent.atomic=ALL-UNNAMED', + '--add-opens', 'java.base/java.util.concurrent.locks=ALL-UNNAMED', + '--add-opens', 'java.base/java.util.concurrent=ALL-UNNAMED', + '--add-opens', 'java.base/java.lang.ref=ALL-UNNAMED', + '--add-opens', 'java.base/java.lang=ALL-UNNAMED', + '--add-opens', 'java.base/java.util=ALL-UNNAMED', + "-javaagent:${configurations.javaAgent.singleFile}", + ] } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 5a28099f73..703e505308 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -2097,18 +2097,6 @@ public boolean containsValue(Object value) { return value; } - @Override - public @Nullable V getIfPresentQuietly(K key, long[/* 1 */] writeTime) { - V value; - Node node = data.get(nodeFactory.newLookupKey(key)); - if ((node == null) || ((value = node.getValue()) == null) - || hasExpired(node, expirationTicker().read())) { - return null; - } - writeTime[0] = node.getWriteTime(); - return value; - } - /** * Returns the key associated with the mapping in this cache, or {@code null} if there is none. * @@ -2441,6 +2429,11 @@ public boolean remove(Object key, Object value) { @Override public boolean replace(K key, V oldValue, V newValue) { + return replace(key, oldValue, newValue, /* shouldDiscardRefresh */ true); + } + + @Override + public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefresh) { requireNonNull(key); requireNonNull(oldValue); requireNonNull(newValue); @@ -2471,7 +2464,10 @@ public boolean replace(K key, V oldValue, V newValue) { setAccessTime(n, now[0]); setWriteTime(n, now[0]); replaced[0] = true; - discardRefresh(k); + + if (shouldDiscardRefresh) { + discardRefresh(k); + } } return n; }); @@ -2693,7 +2689,7 @@ public void replaceAll(BiFunction function) { * @param computeIfAbsent if an absent entry can be computed * @return the new value associated with the specified key, or null if none */ - @SuppressWarnings("PMD.EmptyIfStmt") + @SuppressWarnings("PMD.EmptyControlStatement") @Nullable V remap(K key, Object keyRef, BiFunction remappingFunction, Expiry expiry, long[/* 1 */] now, boolean computeIfAbsent) { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java index b2b066aa78..c4d27df86e 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncCache.java @@ -206,15 +206,15 @@ default void handleCompletion(K key, CompletableFuture valueFuture, && !(error instanceof TimeoutException)) { logger.log(Level.WARNING, "Exception thrown during asynchronous load", error); } - cache().remove(key, valueFuture); cache().statsCounter().recordLoadFailure(loadTime); + cache().remove(key, valueFuture); } else { @SuppressWarnings("unchecked") var castedFuture = (CompletableFuture) valueFuture; // update the weight and expiration timestamps - cache().replace(key, castedFuture, castedFuture); cache().statsCounter().recordLoadSuccess(loadTime); + cache().replace(key, castedFuture, castedFuture, /* shouldDiscardRefresh */ false); } if (recordMiss) { cache().statsCounter().recordMisses(1); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java index d49606eb1b..8d6a233a80 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalAsyncLoadingCache.java @@ -210,6 +210,7 @@ public CompletableFuture> refreshAll(Iterable keys) { } /** Attempts to avoid a reload if the entry is absent, or a load or reload is in-flight. */ + @SuppressWarnings("FutureReturnValueIgnored") private @Nullable CompletableFuture tryOptimisticRefresh(K key, Object keyReference) { // If a refresh is in-flight, then return it directly. If completed and not yet removed, then // remove to trigger a new reload. @@ -234,7 +235,9 @@ public CompletableFuture> refreshAll(Iterable keys) { @SuppressWarnings("unchecked") var prior = (CompletableFuture) asyncCache.cache() .refreshes().putIfAbsent(keyReference, future); - return (prior == null) ? future : prior; + var result = (prior == null) ? future : prior; + result.whenComplete((r, e) -> asyncCache.cache().refreshes().remove(keyReference, result)); + return result; } else if (!oldValueFuture.isDone()) { // no-op if load is pending return oldValueFuture; @@ -248,12 +251,11 @@ public CompletableFuture> refreshAll(Iterable keys) { @SuppressWarnings("FutureReturnValueIgnored") private @Nullable CompletableFuture tryComputeRefresh(K key, Object keyReference) { long[] startTime = new long[1]; - long[] writeTime = new long[1]; boolean[] refreshed = new boolean[1]; @SuppressWarnings({"unchecked", "rawtypes"}) CompletableFuture[] oldValueFuture = new CompletableFuture[1]; var future = asyncCache.cache().refreshes().computeIfAbsent(keyReference, k -> { - oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key, writeTime); + oldValueFuture[0] = asyncCache.cache().getIfPresentQuietly(key); V oldValue = Async.getIfReady(oldValueFuture[0]); if (oldValue == null) { return null; @@ -282,19 +284,20 @@ public CompletableFuture> refreshAll(Iterable keys) { var castedFuture = (CompletableFuture) future; if (refreshed[0]) { castedFuture.whenComplete((newValue, error) -> { - asyncCache.cache().refreshes().remove(keyReference, castedFuture); long loadTime = asyncCache.cache().statsTicker().read() - startTime[0]; if (error != null) { if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) { logger.log(Level.WARNING, "Exception thrown during refresh", error); } + asyncCache.cache().refreshes().remove(keyReference, castedFuture); asyncCache.cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; var value = asyncCache.cache().compute(key, (ignored, currentValue) -> { - if (currentValue == oldValueFuture[0]) { + var successful = asyncCache.cache().refreshes().remove(keyReference, castedFuture); + if (successful && (currentValue == oldValueFuture[0])) { if (currentValue == null) { // If the entry is absent then discard the refresh and maybe notifying the listener discard[0] = (newValue != null); @@ -305,16 +308,8 @@ public CompletableFuture> refreshAll(Iterable keys) { } else if (newValue == Async.getIfReady((CompletableFuture) currentValue)) { // If the completed futures hold the same value instance then no-op return currentValue; - } else { - // If the entry was not modified while in-flight (no ABA) then replace - long expectedWriteTime = writeTime[0]; - if (asyncCache.cache().hasWriteTime()) { - asyncCache.cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return (newValue == null) ? null : castedFuture; - } } + return (newValue == null) ? null : castedFuture; } // Otherwise, a write invalidated the refresh so discard it and notify the listener discard[0] = true; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index f32187a73f..25660711a8 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -81,16 +81,15 @@ interface LocalCache extends ConcurrentMap { @Nullable V getIfPresentQuietly(Object key); - /** - * See {@link Cache#getIfPresent(K)}. This method differs by not recording the access with - * the statistics nor the eviction policy, and populates the write-time if known. - */ - @Nullable - V getIfPresentQuietly(K key, long[/* 1 */] writeTime); - /** See {@link Cache#getAllPresent}. */ Map getAllPresent(Iterable keys); + /** + * See {@link ConcurrentMap#replace(K, K, V)}. This method differs by optionally not discarding an + * in-flight refresh for the entry if replaced. + */ + boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefresh); + @Override default @Nullable V compute(K key, BiFunction remappingFunction) { diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java index 09012db01f..bbe706c78d 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalLoadingCache.java @@ -98,7 +98,6 @@ default Map loadSequentially(Iterable keys) { default CompletableFuture refresh(K key) { requireNonNull(key); - long[] writeTime = new long[1]; long[] startTime = new long[1]; @SuppressWarnings("unchecked") V[] oldValue = (V[]) new Object[1]; @@ -113,7 +112,7 @@ default CompletableFuture refresh(K key) { try { startTime[0] = cache().statsTicker().read(); - oldValue[0] = cache().getIfPresentQuietly(key, writeTime); + oldValue[0] = cache().getIfPresentQuietly(key); var refreshFuture = (oldValue[0] == null) ? cacheLoader().asyncLoad(key, cache().executor()) : cacheLoader().asyncReload(key, oldValue[0], cache().executor()); @@ -131,34 +130,21 @@ default CompletableFuture refresh(K key) { if (reloading[0] != null) { reloading[0].whenComplete((newValue, error) -> { - boolean removed = cache().refreshes().remove(keyReference, reloading[0]); long loadTime = cache().statsTicker().read() - startTime[0]; if (error != null) { if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) { logger.log(Level.WARNING, "Exception thrown during refresh", error); } + cache().refreshes().remove(keyReference, reloading[0]); cache().statsCounter().recordLoadFailure(loadTime); return; } boolean[] discard = new boolean[1]; var value = cache().compute(key, (k, currentValue) -> { - if (currentValue == oldValue[0]) { - if (currentValue == null) { - if (newValue == null) { - return null; - } else if (removed) { - return newValue; - } - } else { - long expectedWriteTime = writeTime[0]; - if (cache().hasWriteTime()) { - cache().getIfPresentQuietly(key, writeTime); - } - if (writeTime[0] == expectedWriteTime) { - return newValue; - } - } + boolean removed = cache().refreshes().remove(keyReference, reloading[0]); + if (removed && (currentValue == oldValue[0])) { + return (currentValue == null) && (newValue == null) ? null : newValue; } discard[0] = (currentValue != newValue); return currentValue; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java index 9ebf188411..ca45579e77 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/UnboundedLocalCache.java @@ -136,11 +136,6 @@ public Object referenceKey(K key) { return data.get(key); } - @Override - public @Nullable V getIfPresentQuietly(K key, long[/* 1 */] writeTime) { - return data.get(key); - } - @Override public long estimatedSize() { return data.mappingCount(); @@ -519,6 +514,11 @@ public boolean remove(Object key, Object value) { @Override public boolean replace(K key, V oldValue, V newValue) { + return replace(key, oldValue, newValue, /* shouldDiscardRefresh */ true); + } + + @Override + public boolean replace(K key, V oldValue, V newValue, boolean shouldDiscardRefresh) { requireNonNull(oldValue); requireNonNull(newValue); @@ -526,8 +526,10 @@ public boolean replace(K key, V oldValue, V newValue) { V[] prev = (V[]) new Object[1]; data.computeIfPresent(key, (k, v) -> { if (v.equals(oldValue)) { + if (shouldDiscardRefresh) { + discardRefresh(k); + } prev[0] = v; - discardRefresh(k); return newValue; } return v; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 8278b4d9ae..393f412698 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -35,6 +35,7 @@ import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.VARIABLE; import static com.github.benmanes.caffeine.cache.testing.CacheSubject.assertThat; import static com.github.benmanes.caffeine.testing.Awaits.await; +import static com.github.benmanes.caffeine.testing.FutureSubject.assertThat; import static com.github.benmanes.caffeine.testing.MapSubject.assertThat; import static com.google.common.truth.Truth.assertThat; import static java.lang.Thread.State.BLOCKED; @@ -57,14 +58,17 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Future; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Listeners; import org.testng.annotations.Test; @@ -1417,6 +1421,65 @@ public CompletableFuture asyncReload(Int key, Int oldValue, Executor execut await().untilAsserted(() -> assertThat(cache).containsEntry(context.absentKey(), newValue)); } + @Test(dataProvider = "caches", groups = "isolated") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.THREADED, + compute = Compute.ASYNC, stats = Stats.DISABLED) + public void refresh_startReloadBeforeLoadCompletion(CacheContext context) { + var stats = Mockito.mock(StatsCounter.class); + var beganLoadSuccess = new AtomicBoolean(); + var endLoadSuccess = new CountDownLatch(1); + var beganReloading = new AtomicBoolean(); + var beganLoading = new AtomicBoolean(); + var endReloading = new AtomicBoolean(); + var endLoading = new AtomicBoolean(); + + context.ticker().setAutoIncrementStep(Duration.ofSeconds(1)); + context.caffeine().recordStats(() -> stats); + var asyncCache = context.buildAsync(new CacheLoader() { + @Override public Int load(Int key) { + beganLoading.set(true); + await().untilTrue(endLoading); + return new Int(ThreadLocalRandom.current().nextInt()); + } + @Override public Int reload(Int key, Int oldValue) { + beganReloading.set(true); + await().untilTrue(endReloading); + return new Int(ThreadLocalRandom.current().nextInt()); + } + }); + + Answer answer = invocation -> { + beganLoadSuccess.set(true); + endLoadSuccess.await(); + return null; + }; + doAnswer(answer).when(stats).recordLoadSuccess(anyLong()); + + // Start load + var future1 = asyncCache.get(context.absentKey()); + await().untilTrue(beganLoading); + + // Complete load; start load callback + endLoading.set(true); + await().untilTrue(beganLoadSuccess); + + // Start reload + var refresh = asyncCache.synchronous().refresh(context.absentKey()); + await().untilTrue(beganReloading); + + // Complete load callback + endLoadSuccess.countDown(); + await().untilAsserted(() -> assertThat(future1.getNumberOfDependents()).isEqualTo(0)); + + // Complete reload callback + endReloading.set(true); + await().untilAsserted(() -> assertThat(refresh.getNumberOfDependents()).isEqualTo(0)); + + // Assert new value + await().untilAsserted(() -> + assertThat(asyncCache.get(context.absentKey())).succeedsWith(refresh.get())); + } + /* --------------- Miscellaneous --------------- */ @Test diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CacheTest.java index b3b72681fe..e6270b3356 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CacheTest.java @@ -61,6 +61,7 @@ import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.ExecutorFailure; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Implementation; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Listener; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; @@ -769,7 +770,8 @@ public void invalidateAll_null(Cache cache, CacheContext context) { @CheckNoStats @Test(dataProvider = "caches") @CacheSpec(population = Population.FULL, compute = Compute.SYNC, - executor = CacheExecutor.REJECTING, removalListener = Listener.CONSUMING) + executorFailure = ExecutorFailure.IGNORED, executor = CacheExecutor.REJECTING, + removalListener = Listener.CONSUMING) public void removalListener_rejected(Cache cache, CacheContext context) { cache.invalidateAll(); assertThat(context).removalNotifications().withCause(EXPLICIT) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java index 5bfc7da2dd..923562a3e0 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RefreshAfterWriteTest.java @@ -63,7 +63,6 @@ import com.github.benmanes.caffeine.cache.testing.CacheSpec.Population; import com.github.benmanes.caffeine.cache.testing.CacheValidationListener; import com.github.benmanes.caffeine.cache.testing.CheckNoEvictions; -import com.github.benmanes.caffeine.cache.testing.TrackingExecutor; import com.github.benmanes.caffeine.testing.ConcurrentTestHarness; import com.github.benmanes.caffeine.testing.Int; import com.github.valfirst.slf4jtest.TestLoggerFactory; @@ -217,16 +216,15 @@ public void refreshIfNeeded_noChange(CacheContext context) { refreshAfterWrite = Expire.ONE_MINUTE, removalListener = Listener.CONSUMING, loader = Loader.IDENTITY, executor = CacheExecutor.THREADED) public void refreshIfNeeded_discard(LoadingCache cache, CacheContext context) { - var executor = (TrackingExecutor) context.executor(); - executor.pause(); + context.executor().pause(); context.ticker().advance(2, TimeUnit.MINUTES); cache.get(context.firstKey()); assertThat(cache.policy().refreshes()).isNotEmpty(); cache.put(context.firstKey(), context.absentValue()); - executor.resume(); + context.executor().resume(); - await().until(() -> executor.submitted() == executor.completed()); + await().until(() -> context.executor().submitted() == context.executor().completed()); assertThat(cache).containsEntry(context.firstKey(), context.absentValue()); assertThat(context).removalNotifications().withCause(REPLACED) @@ -241,16 +239,15 @@ public void refreshIfNeeded_discard(LoadingCache cache, CacheContext c refreshAfterWrite = Expire.ONE_MINUTE, removalListener = Listener.CONSUMING, loader = Loader.IDENTITY, executor = CacheExecutor.THREADED) public void refreshIfNeeded_absent_newValue(LoadingCache cache, CacheContext context) { - var executor = (TrackingExecutor) context.executor(); - executor.pause(); + context.executor().pause(); context.ticker().advance(2, TimeUnit.MINUTES); cache.get(context.firstKey()); assertThat(cache.policy().refreshes()).isNotEmpty(); cache.invalidate(context.firstKey()); - executor.resume(); + context.executor().resume(); - await().until(() -> executor.submitted() == executor.completed()); + await().until(() -> context.executor().submitted() == context.executor().completed()); assertThat(cache).doesNotContainKey(context.firstKey()); assertThat(context).removalNotifications().withCause(REPLACED) @@ -266,16 +263,15 @@ public void refreshIfNeeded_absent_newValue(LoadingCache cache, CacheC refreshAfterWrite = Expire.ONE_MINUTE, removalListener = Listener.CONSUMING, loader = Loader.NULL, executor = CacheExecutor.THREADED) public void refreshIfNeeded_absent_nullValue(LoadingCache cache, CacheContext context) { - var executor = (TrackingExecutor) context.executor(); - executor.pause(); + context.executor().pause(); context.ticker().advance(2, TimeUnit.MINUTES); cache.get(context.firstKey()); assertThat(cache.policy().refreshes()).isNotEmpty(); cache.invalidate(context.firstKey()); - executor.resume(); + context.executor().resume(); - await().until(() -> executor.submitted() == executor.completed()); + await().until(() -> context.executor().submitted() == context.executor().completed()); assertThat(cache).doesNotContainKey(context.firstKey()); assertThat(context).removalNotifications().withCause(EXPLICIT) @@ -768,8 +764,7 @@ public void invalidate(CacheContext context) { cache.invalidate(key); refresh.set(true); - var executor = (TrackingExecutor) context.executor(); - await().until(() -> executor.submitted() == executor.completed()); + await().until(() -> context.executor().submitted() == context.executor().completed()); if (context.isGuava()) { // Guava does not protect against ABA when the entry was removed by allowing a possibly @@ -793,25 +788,24 @@ public void invalidate(CacheContext context) { loader = Loader.ASYNC_INCOMPLETE, refreshAfterWrite = Expire.ONE_MINUTE) public void refresh(LoadingCache cache, CacheContext context) { cache.put(context.absentKey(), context.absentValue()); - var executor = (TrackingExecutor) context.executor(); int submitted; // trigger an automatic refresh - submitted = executor.submitted(); + submitted = context.executor().submitted(); context.ticker().advance(2, TimeUnit.MINUTES); cache.getIfPresent(context.absentKey()); - assertThat(executor.submitted()).isEqualTo(submitted + 1); + assertThat(context.executor().submitted()).isEqualTo(submitted + 1); // return in-flight future var future1 = cache.refresh(context.absentKey()); - assertThat(executor.submitted()).isEqualTo(submitted + 1); + assertThat(context.executor().submitted()).isEqualTo(submitted + 1); future1.complete(intern(context.absentValue().negate())); // trigger a new automatic refresh - submitted = executor.submitted(); + submitted = context.executor().submitted(); context.ticker().advance(2, TimeUnit.MINUTES); cache.getIfPresent(context.absentKey()); - assertThat(executor.submitted()).isEqualTo(submitted + 1); + assertThat(context.executor().submitted()).isEqualTo(submitted + 1); var future2 = cache.refresh(context.absentKey()); assertThat(future2).isNotSameInstanceAs(future1); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java index e891051875..88641b06f4 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/Stresser.java @@ -24,7 +24,6 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.LongAdder; import org.apache.commons.lang3.StringUtils; @@ -52,8 +51,8 @@ public final class Stresser implements Runnable { private static final String[] STATUS = { "Idle", "Required", "Processing -> Idle", "Processing -> Required" }; private static final int MAX_THREADS = 2 * Runtime.getRuntime().availableProcessors(); - private static final int WRITE_MAX_SIZE = (1 << 12); - private static final int TOTAL_KEYS = (1 << 20); + private static final int WRITE_MAX_SIZE = (1 << 12); // 4,096 + private static final int TOTAL_KEYS = (1 << 20); // 1,048,576 private static final int MASK = TOTAL_KEYS - 1; private static final int STATUS_INTERVAL = 5; @@ -63,7 +62,6 @@ public final class Stresser implements Runnable { private BoundedLocalCache local; private LoadingCache cache; - private LongAdder pendingReloads; private Stopwatch stopwatch; private Integer[] ints; @@ -86,13 +84,13 @@ private void initialize() { .recordStats() .build(key -> key); local = (BoundedLocalCache) cache.asMap(); - pendingReloads = new LongAdder(); ints = new Integer[TOTAL_KEYS]; Arrays.setAll(ints, key -> { cache.put(key, key); return key; }); cache.cleanUp(); + local.refreshes(); stopwatch = Stopwatch.createStarted(); status(); } @@ -111,8 +109,7 @@ private void execute() { cache.put(key, key); break; case REFRESH: - pendingReloads.increment(); - cache.refresh(key).thenRun(pendingReloads::decrement); + cache.refresh(key); break; } } @@ -138,7 +135,7 @@ private void status() { System.out.printf("Size = %,d (max: %,d)%n", local.data.mappingCount(), workload.maxEntries); System.out.printf("Lock = [%s%n", StringUtils.substringAfter( local.evictionLock.toString(), "[")); - System.out.printf("Pending reloads = %,d%n", pendingReloads.sum()); + System.out.printf("Pending reloads = %,d%n", local.refreshes.size()); System.out.printf("Pending tasks = %,d%n", ForkJoinPool.commonPool().getQueuedSubmissionCount()); @@ -163,7 +160,7 @@ public static void main(String[] args) { private enum Workload { READ(MAX_THREADS, TOTAL_KEYS), WRITE(MAX_THREADS, WRITE_MAX_SIZE), - REFRESH(1, WRITE_MAX_SIZE); + REFRESH(Math.max(1, Runtime.getRuntime().availableProcessors() / 2), TOTAL_KEYS / 2); private final int maxThreads; private final int maxEntries; diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java index 1bfb7025fb..96b2d75e2b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java @@ -26,7 +26,6 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; @@ -86,6 +85,7 @@ public final class CacheContext { final Listener removalListenerType; final CacheExecutor cacheExecutor; final ReferenceType valueStrength; + final TrackingExecutor executor; final ReferenceType keyStrength; final Expiry expiry; final Map original; @@ -97,7 +97,6 @@ public final class CacheContext { final Expire afterAccess; final Expire afterWrite; final Expire expiryTime; - final Executor executor; final Compute compute; final Expire refresh; final Loader loader; @@ -470,7 +469,7 @@ public CacheExecutor executorType() { return cacheExecutor; } - public Executor executor() { + public TrackingExecutor executor() { return executor; } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java index 60ce393417..1ed957b24a 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java @@ -768,20 +768,20 @@ enum CacheExecutor { THREADED(() -> new TrackingExecutor(ConcurrentTestHarness.executor)), // Cache implementations must avoid corrupting internal state due to rejections REJECTING(() -> { - return new ForkJoinPool() { + return new TrackingExecutor(new ForkJoinPool() { @Override public void execute(Runnable task) { throw new RejectedExecutionException(); } - }; + }); }); - private final Supplier executor; + private final Supplier executor; - CacheExecutor(Supplier executor) { + CacheExecutor(Supplier executor) { this.executor = requireNonNull(executor); } - public Executor create() { + public TrackingExecutor create() { return executor.get(); } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java index 8449501ddd..67b22f5f2a 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java @@ -164,14 +164,14 @@ private void validate(ITestResult testResult) { /** Waits until the executor has completed all of the submitted work. */ private void awaitExecutor(CacheContext context) { - if (context.executor() instanceof TrackingExecutor) { - var executor = (TrackingExecutor) context.executor(); - executor.resume(); + if (context.executor() != null) { + context.executor().resume(); if ((context.cacheExecutor != CacheExecutor.DIRECT) && (context.cacheExecutor != CacheExecutor.DISCARDING) - && (executor.submitted() != executor.completed())) { - await().pollInSameThread().until(() -> executor.submitted() == executor.completed()); + && (context.executor().submitted() != context.executor().completed())) { + await().pollInSameThread().until(() -> + context.executor().submitted() == context.executor().completed()); } } } @@ -209,15 +209,14 @@ private static void checkExecutor(ITestResult testResult, CacheContext context) } assertWithMessage("CacheContext required").that(context).isNotNull(); - if (!(context.executor() instanceof TrackingExecutor)) { + if (context.executor() == null) { return; } - var executor = (TrackingExecutor) context.executor(); if (cacheSpec.executorFailure() == ExecutorFailure.EXPECTED) { - assertThat(executor.failed()).isGreaterThan(0); + assertThat(context.executor().failed()).isGreaterThan(0); } else if (cacheSpec.executorFailure() == ExecutorFailure.DISALLOWED) { - assertThat(executor.failed()).isEqualTo(0); + assertThat(context.executor().failed()).isEqualTo(0); } } diff --git a/caffeine/testing.gradle b/caffeine/testing.gradle index 8c57636c40..58c30ec76a 100644 --- a/caffeine/testing.gradle +++ b/caffeine/testing.gradle @@ -109,6 +109,7 @@ class Stresser extends DefaultTask { @TaskAction def run() { project.javaexec { mainClass = 'com.github.benmanes.caffeine.cache.Stresser' + classpath project.sourceSets.codeGen.runtimeClasspath classpath project.sourceSets.test.runtimeClasspath if (operation) { args '--workload', operation diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index f86fe1ce8c..c77df3b1fe 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -74,7 +74,7 @@ ext { jctools: '3.3.0', junit: '4.13.2', lincheck: '2.14.1', - mockito: '4.5.1', + mockito: '4.6.0', paxExam: '4.13.5', slf4jTest: '2.6.1', testng: '7.6.0', @@ -86,7 +86,7 @@ ext { ] pluginVersions = [ bnd: '6.2.0', - checkstyle: '10.2', + checkstyle: '10.3', coveralls: '2.12.0', dependencyCheck: '7.1.0.1', errorprone: '2.0.2', @@ -96,7 +96,7 @@ ext { jmhReport: '0.9.0', nexusPublish: '1.1.0', nullaway: '1.3.0', - pmd: '6.45.0', + pmd: '6.46.0', semanticVersioning: '1.1.0', snyke: '0.4', sonarqube: '3.3',