Skip to content

Commit

Permalink
Improve robustness in racy scenarios (fixes #568)
Browse files Browse the repository at this point in the history
1. When an entry is updated then a concurrent reader should observe
either the old or new value. This operation replaces the j.l.Reference
instance stored on the entry and the old referent becomes eligible for
garbage collection. A reader holding the stale Reference may therefore
return a null value, which is more likely due to the cache proactively
clearing the referent to assist the garbage collector.

When a null value is read then an extra volatile read is used to
validate that the Reference instance is still held by the entry. This
retry loop has negligible cost.

2. When an entry is eligible for removal due to its value being garbage
collected, then during the eviction's atomic map operation this
eligibility must be verified. If concurrently the entry was resurrected
and a new value set, then the cache writer has already dispatched the
removal notification and established a live mapping. If the evictor does
not detect that the cause is no longer valid, then it would incorrectly
discard the mapping with a removal notification containing a non-null
key, non-null value, and collected removal cause.

Like expiration and size policies, the reference eviction policy will
now validate and no-op if the entry is no longer eligible.

3. When the fixed expiration setting is dynamically adjusted, an
expired entry may be resurrected as no longer eligible for removal.
While the map operation detected this case, stemming from the entry
itself being updated and its lifetime reset, the outer eviction loop
could retry indefinitely due to a stale read of the fixed duration.
This caused the loop to retry the ineligible entry, but instead it
can terminate when eviction fails because it scans a queue ordered
by the expiration timestamp.

Co-authored-by: Justin Horvitz <[email protected]>
  • Loading branch information
ben-manes and justinhorvitz committed Jul 1, 2021
1 parent f279485 commit 9861702
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.lang.model.element.Modifier;

import com.squareup.javapoet.ClassName;
import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;

Expand All @@ -44,7 +45,7 @@ protected boolean applies() {
protected void execute() {
context.nodeSubtype
.addField(newValueField())
.addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.PLAIN))
.addMethod(makeGetValue())
.addMethod(newGetRef("value"))
.addMethod(makeSetValue())
.addMethod(makeContainsValue());
Expand All @@ -60,6 +61,30 @@ private FieldSpec newValueField() {
return fieldSpec.build();
}

/** Creates the getValue method. */
private MethodSpec makeGetValue() {
MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue")
.addModifiers(context.publicFinalModifiers())
.returns(vTypeVar);
if (valueStrength() == Strength.STRONG) {
getter.addStatement("return ($T) $L.get(this)", vTypeVar, varHandleName("value"));
return getter.build();
}

CodeBlock code = CodeBlock.builder()
.beginControlFlow("for (;;)")
.addStatement("$1T<V> ref = ($1T<V>) $2L.get(this)",
Reference.class, varHandleName("value"))
.addStatement("V value = ref.get()")
.beginControlFlow(
"if ((value != null) || (ref == $L.getVolatile(this)))", varHandleName("value"))
.addStatement("return value")
.endControlFlow()
.endControlFlow()
.build();
return getter.addCode(code).build();
}

/** Creates the setValue method. */
private MethodSpec makeSetValue() {
MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue")
Expand All @@ -70,9 +95,10 @@ private MethodSpec makeSetValue() {
if (isStrongValues()) {
setter.addStatement("$L.set(this, $N)", varHandleName("value"), "value");
} else {
setter.addStatement("(($T<V>) getValueReference()).clear()", Reference.class);
setter.addStatement("$1T<V> ref = ($1T<V>) getValueReference()", Reference.class);
setter.addStatement("$L.set(this, new $T($L, $N, referenceQueue))",
varHandleName("value"), valueReferenceType(), "getKeyReference()", "value");
setter.addStatement("ref.clear()");
}

return setter.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,10 +870,10 @@ void expireAfterAccessEntries(AccessOrderDeque<Node<K, V>> accessOrderDeque, lon
long duration = expiresAfterAccessNanos();
for (;;) {
Node<K, V> node = accessOrderDeque.peekFirst();
if ((node == null) || ((now - node.getAccessTime()) < duration)) {
if ((node == null) || ((now - node.getAccessTime()) < duration)
|| !evictEntry(node, RemovalCause.EXPIRED, now)) {
return;
}
evictEntry(node, RemovalCause.EXPIRED, now);
}
}

Expand All @@ -885,11 +885,11 @@ void expireAfterWriteEntries(long now) {
}
long duration = expiresAfterWriteNanos();
for (;;) {
final Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration)) {
Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration)
|| !evictEntry(node, RemovalCause.EXPIRED, now)) {
break;
}
evictEntry(node, RemovalCause.EXPIRED, now);
}
}

Expand Down Expand Up @@ -942,8 +942,8 @@ boolean hasExpired(Node<K, V> node, long now) {
}

/**
* Attempts to evict the entry based on the given removal cause. A removal due to expiration or
* size may be ignored if the entry was updated and is no longer eligible for eviction.
* Attempts to evict the entry based on the given removal cause. A removal due to may be ignored
* if the entry was updated and is no longer eligible for eviction.
*
* @param node the entry to evict
* @param cause the reason to evict
Expand All @@ -958,8 +958,8 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
V[] value = (V[]) new Object[1];
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
RemovalCause[] actualCause = new RemovalCause[1];
Object keyReference = node.getKeyReference();
RemovalCause[] actualCause = new RemovalCause[1];

data.computeIfPresent(keyReference, (k, n) -> {
if (n != node) {
Expand All @@ -968,7 +968,15 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
synchronized (n) {
value[0] = n.getValue();

actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause;
if ((key == null) || (value[0] == null)) {
actualCause[0] = RemovalCause.COLLECTED;
} else if (cause == RemovalCause.COLLECTED) {
resurrect[0] = true;
return n;
} else {
actualCause[0] = cause;
}

if (actualCause[0] == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.Thread.State;
import java.lang.ref.Reference;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -409,6 +413,232 @@ public void evict_update_entryTooBig_protected(Cache<Int, Int> cache, CacheConte
Int.valueOf(1), Int.valueOf(20), RemovalCause.SIZE)));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT},
removalListener = Listener.CONSUMING)
public void evict_resurrect_collected(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
Int oldValue = Int.valueOf(2);
Int newValue = Int.valueOf(3);
var localCache = asBoundedLocalCache(cache);

cache.put(key, oldValue);
var node = localCache.data.get(localCache.referenceKey(key));
@SuppressWarnings("unchecked")
var ref = (Reference<Int>) node.getValueReference();
ref.enqueue();

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
cache.asMap().compute(key, (k, v) -> {
assertThat(v, is(nullValue()));
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});
await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));

return newValue;
});
await().untilTrue(done);

assertThat(node.getValue(), is(newValue));
assertThat(context.removalNotifications(), is(equalTo(List.of(
new RemovalNotification<>(key, null, RemovalCause.COLLECTED)))));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE,
weigher = CacheWeigher.COLLECTION)
public void evict_resurrect_weight(Cache<Int, List<Int>> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, List.of(key));

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.policy().eviction().get().setMaximum(0);
done.set(true);
});

await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));

return List.of();
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(List.of()));
verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, mustExpireWithAnyOf = {AFTER_ACCESS, AFTER_WRITE},
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void evict_resurrect_expireAfter(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, key);

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
context.ticker().advance(Duration.ofHours(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});

await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
return key.negate();
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key.negate()));
verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterAccess = Expire.FOREVER)
public void evict_resurrect_expireAfterAccess(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, key);

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
context.ticker().advance(Duration.ofMinutes(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO);
done.set(true);
});

await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1));
return v;
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterWrite = Expire.FOREVER)
public void evict_resurrect_expireAfterWrite(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, key);

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
context.ticker().advance(Duration.ofMinutes(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO);
done.set(true);
});

await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1));
return v;
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterWrite = Expire.ONE_MINUTE)
public void evict_resurrect_expireAfterWrite_entry(Cache<Int, Int> cache, CacheContext context) {
Int key = Int.valueOf(1);
cache.put(key, key);

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
context.ticker().advance(Duration.ofHours(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});

await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
return key.negate();
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key.negate()));
verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE)
public void evict_resurrect_expireAfterVar(Cache<Int, Int> cache, CacheContext context) {
var localCache = asBoundedLocalCache(cache);
Int key = Int.valueOf(1);
cache.put(key, key);
var node = localCache.data.get(localCache.referenceKey(key));

var started = new AtomicBoolean();
var done = new AtomicBoolean();
var evictor = new AtomicReference<Thread>();
synchronized (node) {
context.ticker().advance(Duration.ofHours(1));
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});

await().untilTrue(started);
var threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1));
}
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}
@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.FULL, maximumSize = Maximum.FULL)
Expand Down
Loading

0 comments on commit 9861702

Please sign in to comment.