Skip to content

Commit

Permalink
Improved refresh linearizability (#193)
Browse files Browse the repository at this point in the history
An in-flight refresh is now discarded for any write to the entry, such
as an update, removal, or eviction.

In the case where the entry was removed and the refresh was discarded,
the removal cause is now "explicit" instead of "replaced" as there is
no mapping. Guava and previous releases would populate the cache, or
at best used "replaced" as the cause even if it was not accurate.

The resulting behavior is more pessimistic by trying to avoid a refresh
from populating the cache with stale data. In practice this should have
little to no penalty.
  • Loading branch information
ben-manes committed Jan 10, 2021
1 parent 722faa1 commit 143137a
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -210,6 +209,8 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
static final long EXPIRE_WRITE_TOLERANCE = TimeUnit.SECONDS.toNanos(1);
/** The maximum duration before an entry expires. */
static final long MAXIMUM_EXPIRY = (Long.MAX_VALUE >> 1); // 150 years
/** The handle for the in-flight refresh operations. */
static final VarHandle REFRESHES;

final ConcurrentHashMap<Object, Node<K, V>> data;
@Nullable final CacheLoader<K, V> cacheLoader;
Expand All @@ -226,7 +227,7 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
@Nullable Set<K> keySet;
@Nullable Collection<V> values;
@Nullable Set<Entry<K, V>> entrySet;
AtomicReference<ConcurrentMap<Object, CompletableFuture<?>>> refreshes;
@Nullable volatile ConcurrentMap<Object, CompletableFuture<?>> refreshes;

/** Creates an instance based on the builder's configuration. */
protected BoundedLocalCache(Caffeine<K, V> builder,
Expand All @@ -236,7 +237,6 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
executor = builder.getExecutor();
writer = builder.getCacheWriter();
evictionLock = new ReentrantLock();
refreshes = new AtomicReference<>();
weigher = builder.getWeigher(isAsync);
drainBuffersTask = new PerformCleanupTask(this);
nodeFactory = NodeFactory.newFactory(builder, isAsync);
Expand All @@ -251,6 +251,15 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
}
}

static {
try {
REFRESHES = MethodHandles.lookup()
.findVarHandle(BoundedLocalCache.class, "refreshes", ConcurrentMap.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

/* --------------- Shared --------------- */

/** Returns if the node's value is currently being computed, asynchronously. */
Expand Down Expand Up @@ -295,16 +304,24 @@ public final Executor executor() {
@Override
@SuppressWarnings("NullAway")
public ConcurrentMap<Object, CompletableFuture<?>> refreshes() {
var pending = refreshes.get();
var pending = refreshes;
if (pending == null) {
pending = new ConcurrentHashMap<>();
if (!refreshes.compareAndSet(null, pending)) {
pending = refreshes.get();
if (!REFRESHES.compareAndSet(this, null, pending)) {
pending = refreshes;
}
}
return pending;
}

/** Invalidate the in-flight refresh. */
void discardRefresh(Object keyReference) {
var pending = refreshes;
if (pending != null) {
pending.remove(keyReference);
}
}

/** Returns whether this cache notifies a writer when an entry is modified. */
protected boolean hasWriter() {
return (writer != CacheWriter.disabledWriter());
Expand Down Expand Up @@ -959,6 +976,7 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
}
makeDead(n);
}
discardRefresh(keyReference);
removed[0] = true;
return null;
});
Expand Down Expand Up @@ -988,12 +1006,6 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {

if (removed[0]) {
statsCounter().recordEviction(node.getWeight(), actualCause[0]);

var pending = refreshes.get();
if (pending != null) {
pending.remove(keyReference);
}

if (hasRemovalListener()) {
// Notify the listener only if the entry was evicted. This must be performed as the last
// step during eviction to safe guard against the executor rejecting the notification task.
Expand Down Expand Up @@ -1819,12 +1831,8 @@ public void clear() {
}

// Discard all entries
var pending = refreshes.get();
for (var entry : data.entrySet()) {
removeNode(entry.getValue(), now);
if (pending != null) {
pending.remove(entry.getKey());
}
}

// Discard all pending reads
Expand Down Expand Up @@ -1860,6 +1868,8 @@ void removeNode(Node<K, V> node, long now) {
if (key != null) {
writer.delete(key, value[0], cause[0]);
}

discardRefresh(node.getKeyReference());
makeDead(n);
return null;
}
Expand Down Expand Up @@ -2035,6 +2045,7 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
Node<K, V> computed = node;
prior = data.computeIfAbsent(node.getKeyReference(), k -> {
writer.write(key, value);
discardRefresh(k);
return computed;
});
if (prior == node) {
Expand All @@ -2059,6 +2070,8 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
afterRead(prior, now, /* recordHit */ false);
return currentValue;
}
} else {
discardRefresh(prior.getKeyReference());
}

V oldValue;
Expand Down Expand Up @@ -2155,16 +2168,12 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
writer.delete(castKey, oldValue[0], cause[0]);
n.retire();
}
discardRefresh(lookupKey);
node[0] = n;
return null;
});

if (cause[0] != null) {
var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}

afterWrite(new RemovalTask(node[0]));
if (hasRemovalListener()) {
notifyRemoval(castKey, oldValue[0], cause[0]);
Expand Down Expand Up @@ -2203,6 +2212,7 @@ public boolean remove(Object key, Object value) {
return node;
}
writer.delete(oldKey[0], oldValue[0], cause[0]);
discardRefresh(lookupKey);
removed[0] = node;
node.retire();
return null;
Expand All @@ -2213,10 +2223,6 @@ public boolean remove(Object key, Object value) {
return false;
}

var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}
if (hasRemovalListener()) {
notifyRemoval(oldKey[0], oldValue[0], cause[0]);
}
Expand Down Expand Up @@ -2257,6 +2263,7 @@ public boolean remove(Object key, Object value) {
setVariableTime(n, varTime);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
discardRefresh(k);
return n;
}
});
Expand Down Expand Up @@ -2313,6 +2320,7 @@ public boolean replace(K key, V oldValue, V newValue) {
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
replaced[0] = true;
discardRefresh(k);
}
return n;
});
Expand Down Expand Up @@ -2435,6 +2443,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
discardRefresh(k);
return n;
}
});
Expand Down Expand Up @@ -2591,6 +2600,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
if (newValue[0] == null) {
if (cause[0] == null) {
cause[0] = RemovalCause.EXPLICIT;
discardRefresh(kr);
}
removed[0] = n;
n.retire();
Expand All @@ -2612,6 +2622,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
n.setWeight(weight[1]);
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
discardRefresh(kr);
return n;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,45 +209,46 @@ public CompletableFuture<V> refresh(K key) {
var castedFuture = (CompletableFuture<V>) future;
if (refreshed[0]) {
castedFuture.whenComplete((newValue, error) -> {
boolean removed = asyncCache.cache().refreshes().remove(keyReference, castedFuture);
long loadTime = asyncCache.cache().statsTicker().read() - startTime[0];
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
asyncCache.cache().refreshes().remove(keyReference, castedFuture);
asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
return;
}

boolean[] discard = new boolean[1];
asyncCache.cache().compute(key, (ignored, currentValue) -> {
if (currentValue == null) {
if (newValue == null) {
return null;
} else if (asyncCache.cache().refreshes().get(key) == castedFuture) {
return castedFuture;
}
} else if (currentValue == oldValueFuture[0]) {
long expectedWriteTime = writeTime[0];
if (asyncCache.cache().hasWriteTime()) {
asyncCache.cache().getIfPresentQuietly(key, writeTime);
}
if (writeTime[0] == expectedWriteTime) {
return (newValue == null) ? null : castedFuture;
var value = asyncCache.cache().compute(key, (ignored, currentValue) -> {
if (currentValue == oldValueFuture[0]) {
if (currentValue == null) {
if (newValue == null) {
return null;
} else if (removed) {
return castedFuture;
}
} else {
long expectedWriteTime = writeTime[0];
if (asyncCache.cache().hasWriteTime()) {
asyncCache.cache().getIfPresentQuietly(key, writeTime);
}
if (writeTime[0] == expectedWriteTime) {
return (newValue == null) ? null : castedFuture;
}
}
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true);

if (discard[0] && asyncCache.cache().hasRemovalListener()) {
asyncCache.cache().notifyRemoval(key, castedFuture, RemovalCause.REPLACED);
if (discard[0] && (newValue != null) && asyncCache.cache().hasRemovalListener()) {
var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
asyncCache.cache().notifyRemoval(key, castedFuture, cause);
}
if (newValue == null) {
asyncCache.cache().statsCounter().recordLoadFailure(loadTime);
} else {
asyncCache.cache().statsCounter().recordLoadSuccess(loadTime);
}

asyncCache.cache().refreshes().remove(keyReference, castedFuture);
});
}
return castedFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,45 +116,46 @@ default CompletableFuture<V> refresh(K key) {

if (reloading[0] != null) {
reloading[0].whenComplete((newValue, error) -> {
boolean removed = cache().refreshes().remove(keyReference, reloading[0]);
long loadTime = cache().statsTicker().read() - startTime[0];
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
cache().refreshes().remove(keyReference, reloading[0]);
cache().statsCounter().recordLoadFailure(loadTime);
return;
}

boolean[] discard = new boolean[1];
cache().compute(key, (k, currentValue) -> {
if (currentValue == null) {
if (newValue == null) {
return null;
} else if (cache().refreshes().get(keyReference) == reloading[0]) {
return newValue;
}
} else if (currentValue == oldValue[0]) {
long expectedWriteTime = writeTime[0];
if (cache().hasWriteTime()) {
cache().getIfPresentQuietly(key, writeTime);
}
if (writeTime[0] == expectedWriteTime) {
return newValue;
var value = cache().compute(key, (k, currentValue) -> {
if (currentValue == oldValue[0]) {
if (currentValue == null) {
if (newValue == null) {
return null;
} else if (removed) {
return newValue;
}
} else {
long expectedWriteTime = writeTime[0];
if (cache().hasWriteTime()) {
cache().getIfPresentQuietly(key, writeTime);
}
if (writeTime[0] == expectedWriteTime) {
return newValue;
}
}
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true);

if (discard[0] && cache().hasRemovalListener()) {
cache().notifyRemoval(key, newValue, RemovalCause.REPLACED);
if (discard[0] && (newValue != null) && cache().hasRemovalListener()) {
var cause = (value == null) ? RemovalCause.EXPLICIT : RemovalCause.REPLACED;
cache().notifyRemoval(key, newValue, cause);
}
if (newValue == null) {
cache().statsCounter().recordLoadFailure(loadTime);
} else {
cache().statsCounter().recordLoadSuccess(loadTime);
}

cache().refreshes().remove(keyReference, reloading[0]);
});
}

Expand Down
Loading

0 comments on commit 143137a

Please sign in to comment.