Skip to content

Commit

Permalink
Return refresh future and ignore redundant refreshes
Browse files Browse the repository at this point in the history
A mapping of in-flight refreshes is now maintained and lazily
initialized if not used. This allows the cache to ignore redundant
requests for reloads, like Guava does. It also removes disablement
of expiration during refresh and resolves an ABA problem if the
entry is modified in a previously undectectable way. The refresh
future can now be obtained from LoadingCache to chain operations
against.

fixes #143
fixes #193
fixes #236
fixes #282
fixes #322
fixed #373
fixes #467
  • Loading branch information
ben-manes committed Jan 3, 2021
1 parent d1d2b23 commit 7c56a72
Show file tree
Hide file tree
Showing 15 changed files with 489 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand Down Expand Up @@ -220,10 +221,10 @@ abstract class BoundedLocalCache<K, V> extends BLCHeader.DrainStatusRef<K, V>
final Executor executor;
final boolean isAsync;

// The collection views
@Nullable transient Set<K> keySet;
@Nullable transient Collection<V> values;
@Nullable transient Set<Entry<K, V>> entrySet;
@Nullable Set<K> keySet;
@Nullable Collection<V> values;
@Nullable Set<Entry<K, V>> entrySet;
AtomicReference<ConcurrentMap<Object, CompletableFuture<?>>> refreshes;

/** Creates an instance based on the builder's configuration. */
protected BoundedLocalCache(Caffeine<K, V> builder,
Expand All @@ -233,6 +234,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
executor = builder.getExecutor();
writer = builder.getCacheWriter();
evictionLock = new ReentrantLock();
refreshes = new AtomicReference<>();
weigher = builder.getWeigher(isAsync);
drainBuffersTask = new PerformCleanupTask(this);
nodeFactory = NodeFactory.newFactory(builder, isAsync);
Expand Down Expand Up @@ -288,11 +290,29 @@ public final Executor executor() {
return executor;
}

@Override
@SuppressWarnings("NullAway")
public ConcurrentMap<Object, CompletableFuture<?>> refreshes() {
var pending = refreshes.get();
if (pending == null) {
pending = new ConcurrentHashMap<>();
if (!refreshes.compareAndSet(null, pending)) {
pending = refreshes.get();
}
}
return pending;
}

/** Returns whether this cache notifies a writer when an entry is modified. */
protected boolean hasWriter() {
return (writer != CacheWriter.disabledWriter());
}

@Override
public Object referenceKey(K key) {
return nodeFactory.newLookupKey(key);
}

/* --------------- Stats Support --------------- */

@Override
Expand Down Expand Up @@ -899,8 +919,9 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {
boolean[] removed = new boolean[1];
boolean[] resurrect = new boolean[1];
RemovalCause[] actualCause = new RemovalCause[1];
Object keyReference = node.getKeyReference();

data.computeIfPresent(node.getKeyReference(), (k, n) -> {
data.computeIfPresent(keyReference, (k, n) -> {
if (n != node) {
return n;
}
Expand Down Expand Up @@ -965,6 +986,12 @@ boolean evictEntry(Node<K, V> node, RemovalCause cause, long now) {

if (removed[0]) {
statsCounter().recordEviction(node.getWeight(), actualCause[0]);

var pending = refreshes.get();
if (pending != null) {
pending.remove(keyReference);
}

if (hasRemovalListener()) {
// Notify the listener only if the entry was evicted. This must be performed as the last
// step during eviction to safe guard against the executor rejecting the notification task.
Expand Down Expand Up @@ -1172,51 +1199,60 @@ void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}

K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = (now + ASYNC_EXPIRY);
if (((now - oldWriteTime) > refreshAfterWriteNanos())
long writeTime = node.getWriteTime();
Object keyReference = node.getKeyReference();
if (((now - writeTime) > refreshAfterWriteNanos()) && (keyReference != null)
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
CompletableFuture<V> refreshFuture;
long startTime = statsTicker().read();
&& !refreshes().containsKey(keyReference)) {
long[] startTime = new long[1];
@SuppressWarnings({"unchecked", "rawtypes"})
CompletableFuture<V>[] refreshFuture = new CompletableFuture[1];
refreshes().computeIfAbsent(keyReference, k -> {
startTime[0] = statsTicker().read();
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = future.thenCompose(value ->
cacheLoader.asyncReload(key, value, executor));
refreshFuture = refresh;
var refresh = cacheLoader.asyncReload(key, future.join(), executor);
refreshFuture[0] = refresh;
} else {
// no-op if load is pending
node.casWriteTime(refreshWriteTime, oldWriteTime);
return;
return future;
}
} else {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture = refresh;
var refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture[0] = refresh;
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime;
return refreshFuture[0];
});

if (refreshFuture[0] != null) {
refreshFuture[0].whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime[0];
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
node.casWriteTime(refreshWriteTime, oldWriteTime);
refreshes().remove(keyReference, refreshFuture[0]);
statsCounter().recordLoadFailure(loadTime);
return;
}

@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;
V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue;

boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
if (value == null) {
return null;
} else if (refreshes().get(key) == refreshFuture[0]) {
return value;
}
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
return value;
}
discard[0] = true;
Expand All @@ -1231,10 +1267,9 @@ void refreshIfNeeded(Node<K, V> node, long now) {
} else {
statsCounter().recordLoadSuccess(loadTime);
}

refreshes().remove(keyReference, refreshFuture[0]);
});
} catch (Throwable t) {
node.casWriteTime(refreshWriteTime, oldWriteTime);
logger.log(Level.ERROR, "Exception thrown when submitting refresh task", t);
}
}
}
Expand Down Expand Up @@ -1782,8 +1817,12 @@ public void clear() {
}

// Discard all entries
for (Node<K, V> node : data.values()) {
removeNode(node, now);
var pending = refreshes.get();
for (var entry : data.entrySet()) {
removeNode(entry.getValue(), now);
if (pending != null) {
pending.remove(entry.getKey());
}
}

// Discard all pending reads
Expand Down Expand Up @@ -2099,8 +2138,9 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
Object lookupKey = nodeFactory.newLookupKey(key);

data.computeIfPresent(nodeFactory.newLookupKey(key), (k, n) -> {
data.computeIfPresent(lookupKey, (k, n) -> {
synchronized (n) {
oldValue[0] = n.getValue();
if (oldValue[0] == null) {
Expand All @@ -2118,6 +2158,11 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
});

if (cause[0] != null) {
var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}

afterWrite(new RemovalTask(node[0]));
if (hasRemovalListener()) {
notifyRemoval(castKey, oldValue[0], cause[0]);
Expand All @@ -2140,8 +2185,9 @@ public boolean remove(Object key, Object value) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
RemovalCause[] cause = new RemovalCause[1];
Object lookupKey = nodeFactory.newLookupKey(key);

data.computeIfPresent(nodeFactory.newLookupKey(key), (kR, node) -> {
data.computeIfPresent(lookupKey, (kR, node) -> {
synchronized (node) {
oldKey[0] = node.getKey();
oldValue[0] = node.getValue();
Expand All @@ -2163,7 +2209,13 @@ public boolean remove(Object key, Object value) {

if (removed[0] == null) {
return false;
} else if (hasRemovalListener()) {
}

var pending = refreshes.get();
if (pending != null) {
pending.remove(lookupKey);
}
if (hasRemovalListener()) {
notifyRemoval(oldKey[0], oldValue[0], cause[0]);
}
afterWrite(new RemovalTask(removed[0]));
Expand Down Expand Up @@ -2582,15 +2634,14 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
if (expiresAfterWrite() || (weightedDifference != 0)) {
afterWrite(new UpdateTask(node, weightedDifference));
} else {
if (cause[0] == null) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
} else if (cause[0] == RemovalCause.COLLECTED) {
scheduleDrainBuffers();
if ((cause[0] == null) && !isComputingAsync(node)) {
tryExpireAfterRead(node, key, newValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
afterRead(node, now[0], /* recordHit */ false);
if ((cause[0] != null) && cause[0].wasEvicted()) {
scheduleDrainBuffers();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.github.benmanes.caffeine.cache;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.checkerframework.checker.nullness.qual.NonNull;
Expand Down Expand Up @@ -101,9 +102,13 @@ public interface LoadingCache<K, V> extends Cache<K, V> {
* Caches loaded by a {@link CacheLoader} will call {@link CacheLoader#reload} if the cache
* currently contains a value for the {@code key}, and {@link CacheLoader#load} otherwise. Loading
* is asynchronous by delegating to the default executor.
* <p>
* Returns an existing future without doing anything if another thread is currently loading the
* value for {@code key}.
*
* @param key key with which a value may be associated
* @return the future that is loading the value
* @throws NullPointerException if the specified key is null
*/
void refresh(@NonNull K key);
CompletableFuture<V> refresh(@NonNull K key);
}
Loading

0 comments on commit 7c56a72

Please sign in to comment.