Skip to content

Commit

Permalink
ensure prompt clean up if a scheduler is configured (fixes #859)
Browse files Browse the repository at this point in the history
A benign race can occur when a maintenance cycle is running, writers
insert new entries, and all of the entries that the policy is aware of
have expired so they are removed. In this case no clean up was scheduled
by the expiration policy, the maintenance status is left as "required",
and entries in the write buffer are waiting to be applied. If the cache
is idle then those entries will expire and the removal listener will not
be notified promptly. As a scheduler is configured to communicate that
a prompt notification is desired, the cache should more aggressively use
it to ensure this case is handled.
  • Loading branch information
ben-manes committed Jan 16, 2023
1 parent a6aa066 commit 18133a7
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 45 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ jobs:
name: Test Results
runs-on: ubuntu-latest
needs: tests
if: github.event_name == 'push'
permissions:
checks: write
steps:
Expand All @@ -264,15 +265,17 @@ jobs:
ignore_runs: true
job_summary: true
- name: Create badge
id: test-badge
env:
LABEL: tests
COLOR: 31c653
STATUS: ${{ fromJSON(steps.test-results.outputs.json).formatted.stats.runs }}
run: curl -s https://badgen.net/badge/${{env.LABEL}}/${{env.STATUS}}/${{env.COLOR}} > badge.svg
continue-on-error: true
run: curl -s -f https://badgen.net/badge/${{env.LABEL}}/${{env.STATUS}}/${{env.COLOR}} > badge.svg
- name: Upload badge to Gist
uses: popsiclestick/[email protected]
if: >
github.event_name == 'push'
steps.test-badge.outcome == 'success'
&& endsWith(github.ref, github.event.repository.default_branch)
with:
gist_url: https://gist.githubusercontent.com/ben-manes/c20eb418f0e0bd6dfe1c25beb35faae4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@
* @param <K> the type of keys maintained by this map
* @param <V> the type of mapped values
*/
@SuppressWarnings({"all", "deprecation", "JdkObsolete", "rawtypes", "serial",
"unchecked", "UnnecessaryParentheses", "UnusedNestedClass", "UnusedVariable"})
@SuppressWarnings({"all", "deprecation", "JdkObsolete", "rawtypes", "serial", "unchecked",
"UnnecessaryParentheses", "UnusedNestedClass", "UnusedVariable", "YodaCondition"})
public class ConcurrentHashMapV7<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
private static final long serialVersionUID = 7249069246763182397L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,7 @@ void afterWrite(Runnable task) {
} finally {
evictionLock.unlock();
}
rescheduleCleanUpIfIncomplete();
}

/** Acquires the eviction lock. */
Expand Down Expand Up @@ -1638,8 +1639,41 @@ void performCleanUp(@Nullable Runnable task) {
} finally {
evictionLock.unlock();
}
if ((drainStatusOpaque() == REQUIRED) && (executor == ForkJoinPool.commonPool())) {
rescheduleCleanUpIfIncomplete();
}

/**
* If there remains pending operations that were not handled by the prior clean up then try to
* schedule an asynchronous maintenance task. This may occur due to a concurrent write after the
* maintenance work had started or if the amortized threshold of work per clean up was reached.
*/
void rescheduleCleanUpIfIncomplete() {
if (drainStatusOpaque() != REQUIRED) {
return;
}

// An immediate scheduling cannot be performed on a custom executor because it may use a
// caller-runs policy. This could cause the caller's penalty to exceed the amortized threshold,
// e.g. repeated concurrent writes could result in a retry loop.
if (executor == ForkJoinPool.commonPool()) {
scheduleDrainBuffers();
return;
}

// If a scheduler was configured then the maintenance can be deferred onto the custom executor
// to be run some time into the future. This is only leveraged if there was not otherwise
// scheduled by a pending expiration event. Otherwise, rely on other cache activity trigger the
// next run.
var pacer = pacer();
if ((pacer != null) && (pacer.future == null) && evictionLock.tryLock()) {
try {
if ((pacer.future == null) && (drainStatusOpaque() == REQUIRED)) {
pacer.schedule(executor, drainBuffersTask,
expirationTicker().read(), Pacer.TOLERANCE);
}
} finally {
evictionLock.unlock();
}
}
}

Expand Down Expand Up @@ -3123,6 +3157,7 @@ <T> T snapshot(Iterable<Node<K, V>> iterable, Function<V, V> transformer,
}
} finally {
evictionLock.unlock();
rescheduleCleanUpIfIncomplete();
}
}

Expand Down Expand Up @@ -4029,19 +4064,27 @@ final class BoundedEviction implements Eviction<K, V> {
if (cache.evicts() && isWeighted()) {
cache.evictionLock.lock();
try {
if (cache.drainStatusOpaque() == REQUIRED) {
cache.maintenance(/* ignored */ null);
}
return OptionalLong.of(Math.max(0, cache.weightedSize()));
} finally {
cache.evictionLock.unlock();
cache.rescheduleCleanUpIfIncomplete();
}
}
return OptionalLong.empty();
}
@Override public long getMaximum() {
cache.evictionLock.lock();
try {
if (cache.drainStatusOpaque() == REQUIRED) {
cache.maintenance(/* ignored */ null);
}
return cache.maximum();
} finally {
cache.evictionLock.unlock();
cache.rescheduleCleanUpIfIncomplete();
}
}
@Override public void setMaximum(long maximum) {
Expand All @@ -4051,6 +4094,7 @@ final class BoundedEviction implements Eviction<K, V> {
cache.maintenance(/* ignored */ null);
} finally {
cache.evictionLock.unlock();
cache.rescheduleCleanUpIfIncomplete();
}
}
@Override public Map<K, V> coldest(int limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static uk.org.lidalia.slf4jext.ConventionalLevelHierarchy.WARN_LEVELS;
Expand Down Expand Up @@ -310,6 +312,87 @@ public void shouldDrainBuffers_invalidDrainStatus() {
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO)
public void rescheduleCleanUpIfIncomplete_complete(
BoundedLocalCache<Int, Int> cache, CacheContext context) {
reset(context.scheduler());
for (int status : new int[] { IDLE, PROCESSING_TO_IDLE, PROCESSING_TO_REQUIRED}) {
cache.drainStatus = status;
cache.rescheduleCleanUpIfIncomplete();
verifyNoInteractions(context.scheduler());
assertThat(cache.drainStatus).isEqualTo(status);
assertThat(context.executor().completed()).isEqualTo(context.population().size());
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY)
public void rescheduleCleanUpIfIncomplete_incompatible(
BoundedLocalCache<Int, Int> cache, CacheContext context) {
cache.drainStatus = REQUIRED;
cache.rescheduleCleanUpIfIncomplete();
assertThat(cache.drainStatus).isEqualTo(REQUIRED);
assertThat(context.executor().completed()).isEqualTo(context.population().size());
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, executor = CacheExecutor.DEFAULT)
public void rescheduleCleanUpIfIncomplete_immediate(
BoundedLocalCache<Int, Int> cache, CacheContext context) {
cache.drainStatus = REQUIRED;
cache.rescheduleCleanUpIfIncomplete();
await().until(() -> cache.drainStatus, is(IDLE));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO)
public void rescheduleCleanUpIfIncomplete_notScheduled_future(
BoundedLocalCache<Int, Int> cache, CacheContext context) {
reset(context.scheduler());
cache.drainStatus = REQUIRED;
cache.pacer().future = DisabledFuture.INSTANCE;

cache.rescheduleCleanUpIfIncomplete();
verifyNoInteractions(context.scheduler());
await().until(() -> cache.drainStatus, is(REQUIRED));
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO)
public void rescheduleCleanUpIfIncomplete_notScheduled_locked(
BoundedLocalCache<Int, Int> cache, CacheContext context) {
reset(context.scheduler());
cache.drainStatus = REQUIRED;
cache.pacer().cancel();

var done = new AtomicBoolean();
cache.evictionLock.lock();
try {
ConcurrentTestHarness.execute(() -> {
cache.rescheduleCleanUpIfIncomplete();
done.set(true);
});
await().untilTrue(done);
verifyNoInteractions(context.scheduler());
await().until(() -> cache.drainStatus, is(REQUIRED));
} finally {
cache.evictionLock.unlock();
}
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY, scheduler = CacheScheduler.MOCKITO)
public void rescheduleCleanUpIfIncomplete_scheduled(
BoundedLocalCache<Int, Int> cache, CacheContext context) {
reset(context.scheduler());
cache.drainStatus = REQUIRED;
cache.pacer().cancel();

cache.rescheduleCleanUpIfIncomplete();
verify(context.scheduler()).schedule(any(), any(), anyLong(), any());
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.EMPTY)
public void afterWrite_drainFullWriteBuffer(
Expand Down Expand Up @@ -382,99 +465,117 @@ public void afterWrite_exception() {
assertThat(event.getLevel()).isEqualTo(ERROR);
}

@Test(dataProvider = "caches")
@CacheSpec(maximumSize = Maximum.FULL, weigher = CacheWeigher.TEN)
public void weightedSize_maintenance(BoundedLocalCache<Int, Int> cache,
CacheContext context, Eviction<Int, Int> eviction) {
cache.drainStatus = REQUIRED;
eviction.weightedSize();
assertThat(cache.drainStatus).isEqualTo(IDLE);
}

@Test(dataProvider = "caches")
@CacheSpec(maximumSize = Maximum.FULL)
public void maximumSize_maintenance(BoundedLocalCache<Int, Int> cache,
CacheContext context, Eviction<Int, Int> eviction) {
cache.drainStatus = REQUIRED;
eviction.getMaximum();
assertThat(cache.drainStatus).isEqualTo(IDLE);
}

/* --------------- Eviction --------------- */

@Test(dataProvider = "caches")
@CacheSpec(maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.MAX_VALUE)
public void overflow_add_one(BoundedLocalCache<Int, Int> map, CacheContext context) {
long actualWeight = map.weightedSize();
map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
map.put(context.absentKey(), context.absentValue());
public void overflow_add_one(BoundedLocalCache<Int, Int> cache, CacheContext context) {
long actualWeight = cache.weightedSize();
cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
cache.put(context.absentKey(), context.absentValue());

assertThat(map).hasSize(context.initialSize());
assertThat(map.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY);
assertThat(cache).hasSize(context.initialSize());
assertThat(cache.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY);

var removed = new HashMap<>(context.original());
removed.put(context.absentKey(), context.absentValue());
removed.keySet().removeAll(map.keySet());
removed.keySet().removeAll(cache.keySet());
assertThat(context).notifications().hasSize(1);
assertThat(context).notifications().withCause(SIZE).contains(removed).exclusively();

// reset for validation listener
map.setWeightedSize(actualWeight);
cache.setWeightedSize(actualWeight);
}

@Test(dataProvider = "caches")
@CacheSpec(maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.MAX_VALUE)
public void overflow_add_many(BoundedLocalCache<Int, Int> map, CacheContext context) {
long actualWeight = map.weightedSize();
map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
map.evictionLock.lock();
public void overflow_add_many(BoundedLocalCache<Int, Int> cache, CacheContext context) {
long actualWeight = cache.weightedSize();
cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
cache.evictionLock.lock();
try {
map.putAll(context.absent());
cache.putAll(context.absent());
} finally {
map.evictionLock.unlock();
cache.evictionLock.unlock();
}
map.cleanUp();
cache.cleanUp();

assertThat(map).hasSize(context.initialSize());
assertThat(map.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY);
assertThat(cache).hasSize(context.initialSize());
assertThat(cache.weightedSize()).isEqualTo(BoundedLocalCache.MAXIMUM_CAPACITY);

var removed = new HashMap<>(context.original());
removed.putAll(context.absent());
removed.keySet().removeAll(map.keySet());
removed.keySet().removeAll(cache.keySet());
assertThat(context).notifications().hasSize(context.absent().size());
assertThat(context).notifications().withCause(SIZE).contains(removed).exclusively();

// reset for validation listener
map.setWeightedSize(actualWeight);
cache.setWeightedSize(actualWeight);
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL,
maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.VALUE)
public void overflow_update_one(BoundedLocalCache<Int, Int> map, CacheContext context) {
map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
map.put(context.firstKey(), Int.MAX_VALUE);
public void overflow_update_one(BoundedLocalCache<Int, Int> cache, CacheContext context) {
cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
cache.put(context.firstKey(), Int.MAX_VALUE);

assertThat(map).hasSizeLessThan(1 + context.initialSize());
assertThat(map.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY);
assertThat(cache).hasSizeLessThan(1 + context.initialSize());
assertThat(cache.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY);

var removed = new HashMap<>(context.original());
removed.put(context.firstKey(), Int.MAX_VALUE);
removed.keySet().removeAll(map.keySet());
removed.keySet().removeAll(cache.keySet());
assertThat(removed.size()).isAtLeast(1);
assertThat(context).notifications().withCause(SIZE).contains(removed);

// reset for validation listener
map.setWeightedSize(map.data.values().stream().mapToLong(Node::getWeight).sum());
cache.setWeightedSize(cache.data.values().stream().mapToLong(Node::getWeight).sum());
}

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL,
maximumSize = Maximum.UNREACHABLE, weigher = CacheWeigher.VALUE)
public void overflow_update_many(BoundedLocalCache<Int, Int> map, CacheContext context) {
public void overflow_update_many(BoundedLocalCache<Int, Int> cache, CacheContext context) {
var updated = Maps.asMap(context.firstMiddleLastKeys(), key -> Int.MAX_VALUE);
map.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
map.evictionLock.lock();
cache.setWeightedSize(BoundedLocalCache.MAXIMUM_CAPACITY);
cache.evictionLock.lock();
try {
map.putAll(updated);
cache.putAll(updated);
} finally {
map.evictionLock.unlock();
cache.evictionLock.unlock();
}
map.cleanUp();
cache.cleanUp();

assertThat(map).hasSizeLessThan(1 + context.initialSize());
assertThat(map.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY);
assertThat(cache).hasSizeLessThan(1 + context.initialSize());
assertThat(cache.weightedSize()).isAtMost(BoundedLocalCache.MAXIMUM_CAPACITY);

var removed = new HashMap<>(context.original());
removed.putAll(updated);
removed.keySet().removeAll(map.keySet());
removed.keySet().removeAll(cache.keySet());
assertThat(removed.size()).isAtLeast(1);
assertThat(context).notifications().withCause(SIZE).contains(removed);

// reset for validation listener
map.setWeightedSize(map.data.values().stream().mapToLong(Node::getWeight).sum());
cache.setWeightedSize(cache.data.values().stream().mapToLong(Node::getWeight).sum());
}

@CheckNoEvictions
Expand Down
Loading

0 comments on commit 18133a7

Please sign in to comment.