Skip to content

Commit

Permalink
Improve robustness in racy scenarios (fixes #568)
Browse files Browse the repository at this point in the history
1. When an entry is updated then a concurrent reader should observe
either the old or new value. This operation replaces the j.l.Reference
instance stored on the entry and the old referent becomes eligible for
garbage collection. A reader holding the stale Reference may therefore
return a null value, which is more likely due to the cache proactively
clearing the referent to assist the garbage collector.

When a null value is read then an extra volatile read is used to
validate that the Reference instance is still held by the entry. This
retry loop has negligible cost.

2. When an entry is eligible for removal due to its value being garbage
collected, then during the eviction's atomic map operation this
eligibility must be verified. If concurrently the entry was resurrected
and a new value set, then the cache writer has already dispatched the
removal notification and established a live mapping. If the evictor does
not detect that the cause is no longer valid, then it would incorrectly
discard the mapping with a removal notification containing a non-null
key, non-null value, and collected removal cause.

Like expiration and size policies, the reference eviction policy will
now validate and no-op if the entry is no longer eligible.

3. When the fixed expiration setting is dynamically adjusted, an
expired entry may be resurrected as no longer eligible for removal.
While the map operation detected this case, stemming from the entry
itself being updated and its lifetime reset, the outer eviction loop
could retry indefinitely due to a stale read of the fixed duration.
This caused the loop to retry the ineligible entry, but instead it
can terminate when eviction fails because it scans a queue ordered
by the expiration timestamp.

Co-authored-by: Justin Horvitz <[email protected]>
  • Loading branch information
ben-manes and justinhorvitz committed Jul 1, 2021
1 parent 1760b9c commit a8fda83
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import javax.lang.model.element.Modifier;

import com.squareup.javapoet.CodeBlock;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;

Expand All @@ -47,7 +48,7 @@ protected void execute() {
context.nodeSubtype
.addField(newFieldOffset(context.className, "value"))
.addField(newValueField())
.addMethod(newGetter(valueStrength(), vTypeVar, "value", Visibility.LAZY))
.addMethod(makeGetValue())
.addMethod(newGetRef("value"))
.addMethod(makeSetValue())
.addMethod(makeContainsValue());
Expand All @@ -60,6 +61,29 @@ private FieldSpec newValueField() {
return fieldSpec.build();
}

/** Creates the getValue method. */
private MethodSpec makeGetValue() {
MethodSpec.Builder getter = MethodSpec.methodBuilder("getValue")
.addModifiers(context.publicFinalModifiers())
.returns(vTypeVar);
if (valueStrength() == Strength.STRONG) {
getter.addStatement("return value");
return getter.build();
}

CodeBlock code = CodeBlock.builder()
.beginControlFlow("for (;;)")
.addStatement("$1T<V> ref = ($1T<V>) $2T.UNSAFE.getObject(this, $3N)",
Reference.class, UNSAFE_ACCESS, offsetName("value"))
.addStatement("V referent = ref.get()")
.beginControlFlow("if ((referent != null) || (ref == value))")
.addStatement("return referent")
.endControlFlow()
.endControlFlow()
.build();
return getter.addCode(code).build();
}

/** Creates the setValue method. */
private MethodSpec makeSetValue() {
MethodSpec.Builder setter = MethodSpec.methodBuilder("setValue")
Expand All @@ -71,9 +95,10 @@ private MethodSpec makeSetValue() {
setter.addStatement("$T.UNSAFE.putObject(this, $N, $N)",
UNSAFE_ACCESS, offsetName("value"), "value");
} else {
setter.addStatement("(($T<V>) getValueReference()).clear()", Reference.class);
setter.addStatement("$1T<V> ref = ($1T<V>) getValueReference()", Reference.class);
setter.addStatement("$T.UNSAFE.putObject(this, $N, new $T($L, $N, referenceQueue))",
UNSAFE_ACCESS, offsetName("value"), valueReferenceType(), "getKeyReference()", "value");
setter.addStatement("ref.clear()");
}

return setter.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,10 @@ void expireAfterAccessEntries(AccessOrderDeque<Node<K, V>> accessOrderDeque, lon
long duration = expiresAfterAccessNanos();
for (;;) {
Node<K, V> node = accessOrderDeque.peekFirst();
if ((node == null) || ((now - node.getAccessTime()) < duration)) {
if ((node == null) || ((now - node.getAccessTime()) < duration)
|| !evictEntry(node, RemovalCause.EXPIRED, now)) {
return;
}
evictEntry(node, RemovalCause.EXPIRED, now);
}
}

Expand All @@ -837,11 +837,11 @@ void expireAfterWriteEntries(long now) {
}
long duration = expiresAfterWriteNanos();
for (;;) {
final Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration)) {
Node<K, V> node = writeOrderDeque().peekFirst();
if ((node == null) || ((now - node.getWriteTime()) < duration)
|| !evictEntry(node, RemovalCause.EXPIRED, now)) {
break;
}
evictEntry(node, RemovalCause.EXPIRED, now);
}
}

Expand Down Expand Up @@ -894,8 +894,8 @@ boolean hasExpired(Node<K, V> node, long now) {
}

/**
* Attempts to evict the entry based on the given removal cause. A removal due to expiration or
* size may be ignored if the entry was updated and is no longer eligible for eviction.
* Attempts to evict the entry based on the given removal cause. A removal due to may be ignored
* if the entry was updated and is no longer eligible for eviction.
*
* @param node the entry to evict
* @param cause the reason to evict
Expand All @@ -919,7 +919,15 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
synchronized (n) {
value[0] = n.getValue();

actualCause[0] = (key == null) || (value[0] == null) ? RemovalCause.COLLECTED : cause;
if ((key == null) || (value[0] == null)) {
actualCause[0] = RemovalCause.COLLECTED;
} else if (cause == RemovalCause.COLLECTED) {
resurrect[0] = true;
return n;
} else {
actualCause[0] = cause;
}

if (actualCause[0] == RemovalCause.EXPIRED) {
boolean expired = false;
if (expiresAfterAccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.Thread.State;
import java.lang.ref.Reference;
import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import org.mockito.Mockito;
Expand Down Expand Up @@ -454,6 +459,239 @@ public void clear_update() {
await().untilAtomic(removedValues, is(oldValue + newValue));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, values = {ReferenceType.WEAK, ReferenceType.SOFT},
removalListener = Listener.CONSUMING)
public void evict_resurrect_collected(Cache<Integer, Integer> cache, CacheContext context) {
Integer key = Integer.valueOf(1);
Integer oldValue = Integer.valueOf(2);
Integer newValue = Integer.valueOf(3);
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);

cache.put(key, oldValue);
Node<Integer, Integer> node = localCache.data.get(
localCache.nodeFactory.newReferenceKey(key, localCache.keyReferenceQueue()));
@SuppressWarnings("unchecked")
Reference<Integer> ref = (Reference<Integer>) node.getValueReference();
ref.enqueue();
ref.clear();

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
cache.asMap().compute(key, (k, v) -> {
assertThat(v, is(nullValue()));
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});
await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));

return newValue;
});
await().untilTrue(done);

assertThat(node.getValue(), is(newValue));
assertThat(context.removalNotifications(), is(equalTo(ImmutableList.of(
new RemovalNotification<>(key, null, RemovalCause.COLLECTED)))));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, maximumSize = Maximum.UNREACHABLE,
weigher = CacheWeigher.COLLECTION)
public void evict_resurrect_weight(Cache<Integer, List<Integer>> cache, CacheContext context) {
Integer key = Integer.valueOf(1);
cache.put(key, ImmutableList.of(key));

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.policy().eviction().get().setMaximum(0);
done.set(true);
});

await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));

return ImmutableList.of();
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(ImmutableList.of()));
verifyRemovalListener(context, verifier -> verifier.hasCount(0, RemovalCause.SIZE));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, mustExpireWithAnyOf = {AFTER_ACCESS, AFTER_WRITE},
expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE},
expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE})
public void evict_resurrect_expireAfter(Cache<Integer, Integer> cache, CacheContext context) {
Integer key = Integer.valueOf(1);
cache.put(key, key);

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
context.ticker().advance(Duration.ofHours(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});

await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
return -key;
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(-key));
verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterAccess = Expire.FOREVER)
public void evict_resurrect_expireAfterAccess(
Cache<Integer, Integer> cache, CacheContext context) {
Integer key = Integer.valueOf(1);
cache.put(key, key);

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
context.ticker().advance(Duration.ofMinutes(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ZERO);
done.set(true);
});

await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireAfterAccess().get().setExpiresAfter(Duration.ofHours(1));
return v;
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterWrite = Expire.FOREVER)
public void evict_resurrect_expireAfterWrite(
Cache<Integer, Integer> cache, CacheContext context) {
Integer key = Integer.valueOf(1);
cache.put(key, key);

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
context.ticker().advance(Duration.ofMinutes(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ZERO);
done.set(true);
});

await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireAfterWrite().get().setExpiresAfter(Duration.ofHours(1));
return v;
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expireAfterWrite = Expire.ONE_MINUTE)
public void evict_resurrect_expireAfterWrite_entry(
Cache<Integer, Integer> cache, CacheContext context) {
Integer key = Integer.valueOf(1);
cache.put(key, key);

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
context.ticker().advance(Duration.ofHours(1));
cache.asMap().compute(key, (k, v) -> {
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});

await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
return -key;
});
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(-key));
verifyRemovalListener(context, verifier -> verifier.hasOnly(1, RemovalCause.EXPIRED));
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.EMPTY, expiry = CacheExpiry.CREATE, expiryTime = Expire.ONE_MINUTE)
public void evict_resurrect_expireAfterVar(Cache<Integer, Integer> cache, CacheContext context) {
BoundedLocalCache<Integer, Integer> localCache = asBoundedLocalCache(cache);
Integer key = Integer.valueOf(1);
cache.put(key, key);
Node<Integer, Integer> node = localCache.data.get(
localCache.nodeFactory.newReferenceKey(key, localCache.keyReferenceQueue()));

AtomicBoolean started = new AtomicBoolean();
AtomicBoolean done = new AtomicBoolean();
AtomicReference<Thread> evictor = new AtomicReference<>();
synchronized (node) {
context.ticker().advance(Duration.ofHours(1));
ConcurrentTestHarness.execute(() -> {
evictor.set(Thread.currentThread());
started.set(true);
cache.cleanUp();
done.set(true);
});

await().untilTrue(started);
EnumSet<State> threadState = EnumSet.of(State.BLOCKED, State.WAITING);
await().until(() -> threadState.contains(evictor.get().getState()));
cache.policy().expireVariably().get().setExpiresAfter(key, Duration.ofDays(1));
}
await().untilTrue(done);

assertThat(cache.getIfPresent(key), is(key));
verifyRemovalListener(context, verifier -> verifier.noInteractions());
}

@Test(dataProvider = "caches")
@CacheSpec(compute = Compute.SYNC, implementation = Implementation.Caffeine,
population = Population.FULL, maximumSize = Maximum.FULL)
Expand Down
Loading

0 comments on commit a8fda83

Please sign in to comment.