Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Return refresh future and ignore redundant refreshes
Browse files Browse the repository at this point in the history
A mapping of in-flight refreshes is now maintained and lazily
initialized if not used. This allows the cache to ignore redundant
requests for reloads, like Guava does. It also removes disablement
of expiration during refresh and resolves an ABA problem if the
entry is modified in a previously undectectable way. The refresh
future can now be obtained from LoadingCache to chain operations
against.

fixes #143
fixes #193
fixes #236
fixes #282
fixes #322
fixed #373
fixes #467
ben-manes committed Feb 8, 2021
1 parent 2f4db02 commit 76a9660
Showing 16 changed files with 573 additions and 133 deletions.
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -221,10 +222,10 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
final Executor executor;
final boolean isAsync;

// The collection views
@Nullable transient Set<K> keySet;
@Nullable transient Collection<V> values;
@Nullable transient Set<Entry<K, V>> entrySet;
@Nullable Set<K> keySet;
@Nullable Collection<V> values;
@Nullable Set<Entry<K, V>> entrySet;
AtomicReference<ConcurrentMap<Object, CompletableFuture<?>>> refreshes;

/** Creates an instance based on the builder's configuration. */
protected BoundedLocalCache(Caffeine<K, V> builder,
@@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
this.cacheLoader = cacheLoader;
executor = builder.getExecutor();
evictionLock = new ReentrantLock();
refreshes = new AtomicReference<>();
weigher = builder.getWeigher(isAsync);
writer = builder.getCacheWriter(isAsync);
drainBuffersTask = new PerformCleanupTask(this);
@@ -289,11 +291,29 @@ public final Executor executor() {
return executor;
}

@Override
@SuppressWarnings("NullAway")
public ConcurrentMap<Object, CompletableFuture<?>> refreshes() {
var pending = refreshes.get();
if (pending == null) {
pending = new ConcurrentHashMap<>();
if (!refreshes.compareAndSet(null, pending)) {
pending = refreshes.get();
}
}
return pending;
}

/** Returns whether this cache notifies a writer when an entry is modified. */
protected boolean hasWriter() {
return (writer != CacheWriter.disabledWriter());
}

@Override
public Object referenceKey(K key) {
return nodeFactory.newLookupKey(key);
}

/* --------------- Stats Support --------------- */

@Override
@@ -900,8 +920,9 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
RemovalCause[] actualCause = new RemovalCause[1];
Object keyReference = node.getKeyReference();

data.computeIfPresent(node.getKeyReference(), (k, n) -> {
data.computeIfPresent(keyReference, (k, n) -> {
if (n != node) {
return n;
}
@@ -964,6 +985,12 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {

if (removed[0]) {
statsCounter().recordEviction(node.getWeight(), actualCause[0]);

var pending = refreshes.get();
if (pending != null) {
pending.remove(keyReference);
}

if (hasRemovalListener()) {
// Notify the listener only if the entry was evicted. This must be performed as the last
// step during eviction to safe guard against the executor rejecting the notification task.
@@ -1171,51 +1198,60 @@ void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}

K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = (now + ASYNC_EXPIRY);
if (((now - oldWriteTime) > refreshAfterWriteNanos())
long writeTime = node.getWriteTime();
Object keyReference = node.getKeyReference();
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
CompletableFuture<V> refreshFuture;
long startTime = statsTicker().read();
&& !refreshes().containsKey(keyReference)) {
long[] startTime = new long[1];
@SuppressWarnings({"unchecked", "rawtypes"})
CompletableFuture<V>[] refreshFuture = new CompletableFuture[1];
refreshes().computeIfAbsent(keyReference, k -> {
startTime[0] = statsTicker().read();
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = future.thenCompose(value ->
cacheLoader.asyncReload(key, value, executor));
refreshFuture = refresh;
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
refreshFuture[0] = refresh;
} else {
// no-op if load is pending
node.casWriteTime(refreshWriteTime, oldWriteTime);
return;
return future;
}
} else {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture = refresh;
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture[0] = refresh;
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime;
return refreshFuture[0];
});

if (refreshFuture[0] != null) {
refreshFuture[0].whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime[0];
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
node.casWriteTime(refreshWriteTime, oldWriteTime);
refreshes().remove(keyReference, refreshFuture[0]);
statsCounter().recordLoadFailure(loadTime);
return;
}

@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;
V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue;

boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
if (value == null) {
return null;
} else if (refreshes().get(key) == refreshFuture[0]) {
return value;
}
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
return value;
}
discard[0] = true;
@@ -1230,10 +1266,9 @@ void refreshIfNeeded(Node<K, V> node, long now) {
} else {
statsCounter().recordLoadSuccess(loadTime);
}

refreshes().remove(keyReference, refreshFuture[0]);
});
} catch (Throwable t) {
node.casWriteTime(refreshWriteTime, oldWriteTime);
logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t);
}
}
}
@@ -1781,8 +1816,12 @@ public void clear() {
}

// Discard all entries
for (Node<K, V> node : data.values()) {
removeNode(node, now);
var pending = refreshes.get();
for (var entry : data.entrySet()) {
removeNode(entry.getValue(), now);
if (pending != null) {
pending.remove(entry.getKey());
}
}

// Discard all pending reads
@@ -2098,8 +2137,9 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
Object lookupKey = nodeFactory.newLookupKey(key);

data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> {
data.computeIfPresent(lookupKey, (k, n) -> {
synchronized (n) {
oldValue[0] = n.getValue();
if (oldValue[0] == null) {
@@ -2117,6 +2157,11 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
});

if (cause[0] != null) {
var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}

afterWrite(new RemovalTask(node[0]));
if (hasRemovalListener()) {
notifyRemoval(castKey, oldValue[0], cause[0]);
@@ -2139,8 +2184,9 @@ public boolean remove(Object key, Object value) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
Object lookupKey = nodeFactory.newLookupKey(key);

data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> {
data.computeIfPresent(lookupKey, (kR, node) -> {
synchronized (node) {
oldKey[0] = node.getKey();
oldValue[0] = node.getValue();
@@ -2162,7 +2208,13 @@ public boolean remove(Object key, Object value) {

if (removed[0] == null) {
return false;
} else if (hasRemovalListener()) {
}

var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}
if (hasRemovalListener()) {
notifyRemoval(oldKey[0], oldValue[0], cause[0]);
}
afterWrite(new RemovalTask(removed[0]));
@@ -2581,15 +2633,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
if (expiresAfterWrite() || (weightedDifference != 0)) {
afterWrite(new UpdateTask(node, weightedDifference));
} else {
if (cause[0] == null) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
} else if (cause[0] == RemovalCause.COLLECTED) {
scheduleDrainBuffers();
if ((cause[0] == null) && !isComputingAsync(node)) {
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
afterRead(node, now[0], /* recordHit */ false);
if ((cause[0] != null) && cause[0].wasEvicted()) {
scheduleDrainBuffers();
}
}
}

Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
package com.github.benmanes.caffeine.cache;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.checkerframework.checker.nullness.qual.NonNull;
@@ -101,9 +102,13 @@ public interface LoadingCache<K, V> extends Cache<K, V> {
* Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache
* currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading
* is asynchronous by delegating to the default executor.
* <p>
* Returns an existing future without doing anything if another thread is currently loading the
* value for {@code key}.
*
* @param key key with which a value may be associated
* @return the future that is loading the value
* @throws NullPointerException if the specified key is null
*/
void refresh(@NonNull K key);
CompletableFuture<V> refresh(@NonNull K key);
}
Loading

0 comments on commit 76a9660

Please sign in to comment.