From 18133a74398ba73e702b1573126265147811fd0a Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Mon, 16 Jan 2023 11:26:02 -0800 Subject: [PATCH] ensure prompt clean up if a scheduler is configured (fixes #859) A benign race can occur when a maintenance cycle is running, writers insert new entries, and all of the entries that the policy is aware of have expired so they are removed. In this case no clean up was scheduled by the expiration policy, the maintenance status is left as "required", and entries in the write buffer are waiting to be applied. If the cache is idle then those entries will expire and the removal listener will not be notified promptly. As a scheduler is configured to communicate that a prompt notification is desired, the cache should more aggressively use it to ensure this case is handled. --- .github/workflows/build.yml | 7 +- .../cache/impl/ConcurrentHashMapV7.java | 4 +- .../caffeine/cache/BoundedLocalCache.java | 46 ++++- .../caffeine/cache/BoundedLocalCacheTest.java | 173 ++++++++++++++---- .../caffeine/cache/issues/Issue859Test.java | 86 +++++++++ gradle/codeQuality.gradle | 2 +- gradle/dependencies.gradle | 6 +- 7 files changed, 279 insertions(+), 45 deletions(-) create mode 100644 caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue859Test.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0cbd904f19..a6a96995e8 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -238,6 +238,7 @@ jobs: name: Test Results runs-on: ubuntu-latest needs: tests + if: github.event_name == 'push' permissions: checks: write steps: @@ -264,15 +265,17 @@ jobs: ignore_runs: true job_summary: true - name: Create badge + id: test-badge env: LABEL: tests COLOR: 31c653 STATUS: ${{ fromJSON(steps.test-results.outputs.json).formatted.stats.runs }} - run: curl -s https://badgen.net/badge/${{env.LABEL}}/${{env.STATUS}}/${{env.COLOR}} > badge.svg + continue-on-error: true + run: curl -s -f https://badgen.net/badge/${{env.LABEL}}/${{env.STATUS}}/${{env.COLOR}} > badge.svg - name: Upload badge to Gist uses: popsiclestick/gist-sync-action@v1.2.0 if: > - github.event_name == 'push' + steps.test-badge.outcome == 'success' && endsWith(github.ref, github.event.repository.default_branch) with: gist_url: https://gist.githubusercontent.com/ben-manes/c20eb418f0e0bd6dfe1c25beb35faae4 diff --git a/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/impl/ConcurrentHashMapV7.java b/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/impl/ConcurrentHashMapV7.java index ab95bbe3ac..968943e251 100644 --- a/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/impl/ConcurrentHashMapV7.java +++ b/caffeine/src/jmh/java/com/github/benmanes/caffeine/cache/impl/ConcurrentHashMapV7.java @@ -113,8 +113,8 @@ * @param the type of keys maintained by this map * @param the type of mapped values */ -@SuppressWarnings({"all", "deprecation", "JdkObsolete", "rawtypes", "serial", - "unchecked", "UnnecessaryParentheses", "UnusedNestedClass", "UnusedVariable"}) +@SuppressWarnings({"all", "deprecation", "JdkObsolete", "rawtypes", "serial", "unchecked", + "UnnecessaryParentheses", "UnusedNestedClass", "UnusedVariable", "YodaCondition"}) public class ConcurrentHashMapV7 extends AbstractMap implements ConcurrentMap, Serializable { private static final long serialVersionUID = 7249069246763182397L; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index dbfac63b79..bda0a169a2 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -1530,6 +1530,7 @@ void afterWrite(Runnable task) { } finally { evictionLock.unlock(); } + rescheduleCleanUpIfIncomplete(); } /** Acquires the eviction lock. */ @@ -1638,8 +1639,41 @@ void performCleanUp(@Nullable Runnable task) { } finally { evictionLock.unlock(); } - if ((drainStatusOpaque() == REQUIRED) && (executor == ForkJoinPool.commonPool())) { + rescheduleCleanUpIfIncomplete(); + } + + /** + * If there remains pending operations that were not handled by the prior clean up then try to + * schedule an asynchronous maintenance task. This may occur due to a concurrent write after the + * maintenance work had started or if the amortized threshold of work per clean up was reached. + */ + void rescheduleCleanUpIfIncomplete() { + if (drainStatusOpaque() != REQUIRED) { + return; + } + + // An immediate scheduling cannot be performed on a custom executor because it may use a + // caller-runs policy. This could cause the caller's penalty to exceed the amortized threshold, + // e.g. repeated concurrent writes could result in a retry loop. + if (executor == ForkJoinPool.commonPool()) { scheduleDrainBuffers(); + return; + } + + // If a scheduler was configured then the maintenance can be deferred onto the custom executor + // to be run some time into the future. This is only leveraged if there was not otherwise + // scheduled by a pending expiration event. Otherwise, rely on other cache activity trigger the + // next run. + var pacer = pacer(); + if ((pacer != null) && (pacer.future == null) && evictionLock.tryLock()) { + try { + if ((pacer.future == null) && (drainStatusOpaque() == REQUIRED)) { + pacer.schedule(executor, drainBuffersTask, + expirationTicker().read(), Pacer.TOLERANCE); + } + } finally { + evictionLock.unlock(); + } } } @@ -3123,6 +3157,7 @@ T snapshot(Iterable> iterable, Function transformer, } } finally { evictionLock.unlock(); + rescheduleCleanUpIfIncomplete(); } } @@ -4029,9 +4064,13 @@ final class BoundedEviction implements Eviction { if (cache.evicts() && isWeighted()) { cache.evictionLock.lock(); try { + if (cache.drainStatusOpaque() == REQUIRED) { + cache.maintenance(/* ignored */ null); + } return OptionalLong.of(Math.max(0, cache.weightedSize())); } finally { cache.evictionLock.unlock(); + cache.rescheduleCleanUpIfIncomplete(); } } return OptionalLong.empty(); @@ -4039,9 +4078,13 @@ final class BoundedEviction implements Eviction { @Override public long getMaximum() { cache.evictionLock.lock(); try { + if (cache.drainStatusOpaque() == REQUIRED) { + cache.maintenance(/* ignored */ null); + } return cache.maximum(); } finally { cache.evictionLock.unlock(); + cache.rescheduleCleanUpIfIncomplete(); } } @Override public void setMaximum(long maximum) { @@ -4051,6 +4094,7 @@ final class BoundedEviction implements Eviction { cache.maintenance(/* ignored */ null); } finally { cache.evictionLock.unlock(); + cache.rescheduleCleanUpIfIncomplete(); } } @Override public Map coldest(int limit) { diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 763127772a..06a8948555 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -54,7 +54,9 @@ import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static uk.org.lidalia.slf4jext.ConventionalLevelHierarchy.WARN_LEVELS; @@ -310,6 +312,87 @@ public void shouldDrainBuffers_invalidDrainStatus() { } } + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO) + public void rescheduleCleanUpIfIncomplete_complete( + BoundedLocalCache cache, CacheContext context) { + reset(context.scheduler()); + for (int status : new int[] { IDLE, PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED}) { + cache.drainStatus = status; + cache.rescheduleCleanUpIfIncomplete(); + verifyNoInteractions(context.scheduler()); + assertThat(cache.drainStatus).isEqualTo(status); + assertThat(context.executor().completed()).isEqualTo(context.population().size()); + } + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY) + public void rescheduleCleanUpIfIncomplete_incompatible( + BoundedLocalCache cache, CacheContext context) { + cache.drainStatus = REQUIRED; + cache.rescheduleCleanUpIfIncomplete(); + assertThat(cache.drainStatus).isEqualTo(REQUIRED); + assertThat(context.executor().completed()).isEqualTo(context.population().size()); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, executor = CacheExecutor.DEFAULT) + public void rescheduleCleanUpIfIncomplete_immediate( + BoundedLocalCache cache, CacheContext context) { + cache.drainStatus = REQUIRED; + cache.rescheduleCleanUpIfIncomplete(); + await().until(() -> cache.drainStatus, is(IDLE)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO) + public void rescheduleCleanUpIfIncomplete_notScheduled_future( + BoundedLocalCache cache, CacheContext context) { + reset(context.scheduler()); + cache.drainStatus = REQUIRED; + cache.pacer().future = DisabledFuture.INSTANCE; + + cache.rescheduleCleanUpIfIncomplete(); + verifyNoInteractions(context.scheduler()); + await().until(() -> cache.drainStatus, is(REQUIRED)); + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO) + public void rescheduleCleanUpIfIncomplete_notScheduled_locked( + BoundedLocalCache cache, CacheContext context) { + reset(context.scheduler()); + cache.drainStatus = REQUIRED; + cache.pacer().cancel(); + + var done = new AtomicBoolean(); + cache.evictionLock.lock(); + try { + ConcurrentTestHarness.execute(() -> { + cache.rescheduleCleanUpIfIncomplete(); + done.set(true); + }); + await().untilTrue(done); + verifyNoInteractions(context.scheduler()); + await().until(() -> cache.drainStatus, is(REQUIRED)); + } finally { + cache.evictionLock.unlock(); + } + } + + @Test(dataProvider = "caches") + @CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO) + public void rescheduleCleanUpIfIncomplete_scheduled( + BoundedLocalCache cache, CacheContext context) { + reset(context.scheduler()); + cache.drainStatus = REQUIRED; + cache.pacer().cancel(); + + cache.rescheduleCleanUpIfIncomplete(); + verify(context.scheduler()).schedule(any(), any(), anyLong(), any()); + } + @Test(dataProvider = "caches") @CacheSpec(population = Population.EMPTY) public void afterWrite_drainFullWriteBuffer( @@ -382,99 +465,117 @@ public void afterWrite_exception() { assertThat(event.getLevel()).isEqualTo(ERROR); } + @Test(dataProvider = "caches") + @CacheSpec(maximumSize = Maximum.FULL, weigher = CacheWeigher.TEN) + public void weightedSize_maintenance(BoundedLocalCache cache, + CacheContext context, Eviction eviction) { + cache.drainStatus = REQUIRED; + eviction.weightedSize(); + assertThat(cache.drainStatus).isEqualTo(IDLE); + } + + @Test(dataProvider = "caches") + @CacheSpec(maximumSize = Maximum.FULL) + public void maximumSize_maintenance(BoundedLocalCache cache, + CacheContext context, Eviction eviction) { + cache.drainStatus = REQUIRED; + eviction.getMaximum(); + assertThat(cache.drainStatus).isEqualTo(IDLE); + } + /* --------------- Eviction --------------- */ @Test(dataProvider = "caches") @CacheSpec(maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.MAX_VALUE) - public void overflow_add_one(BoundedLocalCache map, CacheContext context) { - long actualWeight = map.weightedSize(); - map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); - map.put(context.absentKey(), context.absentValue()); + public void overflow_add_one(BoundedLocalCache cache, CacheContext context) { + long actualWeight = cache.weightedSize(); + cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); + cache.put(context.absentKey(), context.absentValue()); - assertThat(map).hasSize(context.initialSize()); - assertThat(map.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY); + assertThat(cache).hasSize(context.initialSize()); + assertThat(cache.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY); var removed = new HashMap<>(context.original()); removed.put(context.absentKey(), context.absentValue()); - removed.keySet().removeAll(map.keySet()); + removed.keySet().removeAll(cache.keySet()); assertThat(context).notifications().hasSize(1); assertThat(context).notifications().withCause(SIZE).contains(removed).exclusively(); // reset for validation listener - map.setWeightedSize(actualWeight); + cache.setWeightedSize(actualWeight); } @Test(dataProvider = "caches") @CacheSpec(maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.MAX_VALUE) - public void overflow_add_many(BoundedLocalCache map, CacheContext context) { - long actualWeight = map.weightedSize(); - map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); - map.evictionLock.lock(); + public void overflow_add_many(BoundedLocalCache cache, CacheContext context) { + long actualWeight = cache.weightedSize(); + cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); + cache.evictionLock.lock(); try { - map.putAll(context.absent()); + cache.putAll(context.absent()); } finally { - map.evictionLock.unlock(); + cache.evictionLock.unlock(); } - map.cleanUp(); + cache.cleanUp(); - assertThat(map).hasSize(context.initialSize()); - assertThat(map.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY); + assertThat(cache).hasSize(context.initialSize()); + assertThat(cache.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY); var removed = new HashMap<>(context.original()); removed.putAll(context.absent()); - removed.keySet().removeAll(map.keySet()); + removed.keySet().removeAll(cache.keySet()); assertThat(context).notifications().hasSize(context.absent().size()); assertThat(context).notifications().withCause(SIZE).contains(removed).exclusively(); // reset for validation listener - map.setWeightedSize(actualWeight); + cache.setWeightedSize(actualWeight); } @Test(dataProvider = "caches") @CacheSpec(population = Population.FULL, maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.VALUE) - public void overflow_update_one(BoundedLocalCache map, CacheContext context) { - map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); - map.put(context.firstKey(), Int.MAX_VALUE); + public void overflow_update_one(BoundedLocalCache cache, CacheContext context) { + cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); + cache.put(context.firstKey(), Int.MAX_VALUE); - assertThat(map).hasSizeLessThan(1 + context.initialSize()); - assertThat(map.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY); + assertThat(cache).hasSizeLessThan(1 + context.initialSize()); + assertThat(cache.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY); var removed = new HashMap<>(context.original()); removed.put(context.firstKey(), Int.MAX_VALUE); - removed.keySet().removeAll(map.keySet()); + removed.keySet().removeAll(cache.keySet()); assertThat(removed.size()).isAtLeast(1); assertThat(context).notifications().withCause(SIZE).contains(removed); // reset for validation listener - map.setWeightedSize(map.data.values().stream().mapToLong(Node::getWeight).sum()); + cache.setWeightedSize(cache.data.values().stream().mapToLong(Node::getWeight).sum()); } @Test(dataProvider = "caches") @CacheSpec(population = Population.FULL, maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.VALUE) - public void overflow_update_many(BoundedLocalCache map, CacheContext context) { + public void overflow_update_many(BoundedLocalCache cache, CacheContext context) { var updated = Maps.asMap(context.firstMiddleLastKeys(), key -> Int.MAX_VALUE); - map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); - map.evictionLock.lock(); + cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY); + cache.evictionLock.lock(); try { - map.putAll(updated); + cache.putAll(updated); } finally { - map.evictionLock.unlock(); + cache.evictionLock.unlock(); } - map.cleanUp(); + cache.cleanUp(); - assertThat(map).hasSizeLessThan(1 + context.initialSize()); - assertThat(map.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY); + assertThat(cache).hasSizeLessThan(1 + context.initialSize()); + assertThat(cache.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY); var removed = new HashMap<>(context.original()); removed.putAll(updated); - removed.keySet().removeAll(map.keySet()); + removed.keySet().removeAll(cache.keySet()); assertThat(removed.size()).isAtLeast(1); assertThat(context).notifications().withCause(SIZE).contains(removed); // reset for validation listener - map.setWeightedSize(map.data.values().stream().mapToLong(Node::getWeight).sum()); + cache.setWeightedSize(cache.data.values().stream().mapToLong(Node::getWeight).sum()); } @CheckNoEvictions diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue859Test.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue859Test.java new file mode 100644 index 0000000000..fe713d00dc --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/issues/Issue859Test.java @@ -0,0 +1,86 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.issues; + +import static com.github.benmanes.caffeine.testing.ConcurrentTestHarness.executor; +import static com.google.common.truth.Truth.assertThat; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; + +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.google.common.util.concurrent.Uninterruptibles; + +/** + * Issue #859: Removal listener not called (due to no activity and pending work in write buffer) + *

+ * While a maintenance cycle is running it disables scheduling of a clean up by concurrent writers. + * When complete the drain status is restored to required and previously this may have had to wait + * for other activity to trigger the clean up. That could be an excessive delay if the write buffer + * contains inserted entries, they expire, and a prompt removal notification is expected due to a + * configured scheduler. To avoid this delay, the maintenance cycle should be scheduled. + * + * @author mario-schwede-hivemq (Mario Schwede) + * @author ben.manes@gmail.com (Ben Manes) + */ +@Test(groups = "isolated") +public final class Issue859Test { + private static final int NUMBER_OF_RUNS = 100_000; + private static final int NUMBER_OF_KEYS = 10; + + @Test + void scheduleIfPendingWrites() { + var runs = new ArrayList(); + for (int i = 1; i <= NUMBER_OF_RUNS; i++) { + runs.add(runTest()); + } + for (var run : runs) { + boolean finished = Uninterruptibles.awaitUninterruptibly(run.latch, Duration.ofSeconds(5)); + assertThat(finished).isTrue(); + } + } + + private TestRun runTest() { + var latch = new CountDownLatch(NUMBER_OF_KEYS); + Cache cache = Caffeine.newBuilder() + .removalListener((key, value, cause) -> latch.countDown()) + .expireAfterWrite(Duration.ofMillis(5)) + .scheduler(Scheduler.systemScheduler()) + .executor(executor) + .build(); + for (int i = 0; i < NUMBER_OF_KEYS; i++) { + var key = i; + executor.execute(() -> cache.put(key, Boolean.TRUE)); + } + + // The scheduler maintains a weak reference to the cache, so it must be held strongly until the + // test completes. + return new TestRun(cache, latch); + } + + static final class TestRun { + final Cache cache; + final CountDownLatch latch; + + TestRun(Cache cache, CountDownLatch latch) { + this.cache = cache; + this.latch = latch; + } + } +} diff --git a/gradle/codeQuality.gradle b/gradle/codeQuality.gradle index e892df9563..b958a33cc4 100644 --- a/gradle/codeQuality.gradle +++ b/gradle/codeQuality.gradle @@ -82,7 +82,7 @@ forbiddenApis { bundledSignatures += [ 'jdk-deprecated', 'jdk-internal', 'jdk-unsafe', 'jdk-non-portable', 'jdk-reflection', 'jdk-system-out'] - failOnUnresolvableSignatures = false + ignoreSignaturesOfMissingClasses = true } spotbugs { diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 6f1f2137a8..1fef047552 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -54,7 +54,7 @@ ext { jmh: '1.36', joor: '0.9.14', jsr330: '1', - nullaway: '0.10.7', + nullaway: '0.10.8', ohc: '0.6.1', osgiComponentAnnotations: '1.5.1', picocli: '4.7.0', @@ -77,10 +77,10 @@ ext { jcacheTck: '1.1.1', jctools: '4.0.1', junit4: '4.13.2', - junit5: '5.9.1', + junit5: '5.9.2', junitTestNG: '1.0.4', lincheck: '2.16', - mockito: '4.11.0', + mockito: '5.0.0', osgiUtilFunction: '1.2.0', osgiUtilPromise: '1.3.0', paxExam: '4.13.5',