Skip to content

Commit

Permalink
Return the result of an immediate refreshAfterWrite (#688)
Browse files Browse the repository at this point in the history
In Guava if the CacheLoader returns a completed future on a reload then
the new value is returned to the caller and overwritten in the cache.
Otherwise the last read value is returned and the future overwrites
when it is done. This behavior is now supported instead of the more
naive fire-and-forget.

A quirk in Guava is when using Cache.get(key, callable). In Guava this
is wrapped as the cache loader and is used if a refresh is triggered.
That causes the supplied CacheLoader's reload to not be called, which
could be more surprising. However, their asMap().computeIfAbsent does
not trigger a refresh on read so the value is not reloaded. As this
behavior is confusing and appears accidental, Caffeine will always use
the attached CacheLoader and its reload function for refreshing.
  • Loading branch information
ben-manes committed Mar 27, 2022
1 parent 5bc7bcb commit cbc1cab
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,9 @@ void demoteFromMainProtected() {
* @param node the entry in the page replacement policy
* @param now the current time, in nanoseconds
* @param recordHit if the hit count should be incremented
* @return the refreshed value if immediately loaded, else null
*/
void afterRead(Node<K, V> node, long now, boolean recordHit) {
@Nullable V afterRead(Node<K, V> node, long now, boolean recordHit) {
if (recordHit) {
statsCounter().recordHits(1);
}
Expand All @@ -1233,7 +1234,7 @@ void afterRead(Node<K, V> node, long now, boolean recordHit) {
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
refreshIfNeeded(node, now);
return refreshIfNeeded(node, now);
}

/** Returns if the cache should bypass the read buffer. */
Expand All @@ -1246,11 +1247,12 @@ boolean skipReadBuffer() {
*
* @param node the entry in the cache to refresh
* @param now the current time, in nanoseconds
* @return the refreshed value if immediately loaded, else null
*/
@SuppressWarnings("FutureReturnValueIgnored")
void refreshIfNeeded(Node<K, V> node, long now) {
@Nullable V refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
return null;
}

K key;
Expand Down Expand Up @@ -1300,57 +1302,63 @@ void refreshIfNeeded(Node<K, V> node, long now) {
node.casWriteTime(refreshWriteTime, writeTime);
}

if (refreshFuture[0] != null) {
refreshFuture[0].whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime[0];
if (error != null) {
if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
}
refreshes.remove(keyReference, refreshFuture[0]);
statsCounter().recordLoadFailure(loadTime);
return;
if (refreshFuture[0] == null) {
return null;
}

var refreshed = refreshFuture[0].handle((newValue, error) -> {
long loadTime = statsTicker().read() - startTime[0];
if (error != null) {
if (!(error instanceof CancellationException) && !(error instanceof TimeoutException)) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
}
refreshes.remove(keyReference, refreshFuture[0]);
statsCounter().recordLoadFailure(loadTime);
return null;
}

@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue;

boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
// If the entry is absent then discard the refresh and maybe notifying the listener
discard[0] = (value != null);
return null;
} else if (currentValue == value) {
// If the reloaded value is the same instance then no-op
return currentValue;
} else if (isAsync &&
(newValue == Async.getIfReady((CompletableFuture<?>) currentValue))) {
// If the completed futures hold the same value instance then no-op
return currentValue;
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
// If the entry was not modified while in-flight (no ABA) then replace
return value;
}
// Otherwise, a write invalidated the refresh so discard it and notify the listener
discard[0] = true;
return currentValue;
}, expiry(), /* recordMiss */ false,
/* recordLoad */ false, /* recordLoadFailure */ true);
@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture[0] : newValue;

if (discard[0]) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
boolean[] discard = new boolean[1];
V result = compute(key, (k, currentValue) -> {
if (currentValue == null) {
// If the entry is absent then discard the refresh and maybe notifying the listener
discard[0] = (value != null);
return null;
} else if (currentValue == value) {
// If the reloaded value is the same instance then no-op
return currentValue;
} else if (isAsync &&
(newValue == Async.getIfReady((CompletableFuture<?>) currentValue))) {
// If the completed futures hold the same value instance then no-op
return currentValue;
} else if ((currentValue == oldValue) && (node.getWriteTime() == writeTime)) {
// If the entry was not modified while in-flight (no ABA) then replace
return value;
}
// Otherwise, a write invalidated the refresh so discard it and notify the listener
discard[0] = true;
return currentValue;
}, expiry(), /* recordMiss */ false,
/* recordLoad */ false, /* recordLoadFailure */ true);

refreshes.remove(keyReference, refreshFuture[0]);
});
}
if (discard[0]) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
}

refreshes.remove(keyReference, refreshFuture[0]);
return result;
});
return Async.getIfReady(refreshed);
}

return null;
}

/**
Expand Down Expand Up @@ -2074,8 +2082,8 @@ public boolean containsValue(Object value) {
setAccessTime(node, now);
tryExpireAfterRead(node, castedKey, value, expiry(), now);
}
afterRead(node, now, recordStats);
return value;
V refreshed = afterRead(node, now, recordStats);
return (refreshed == null) ? value : refreshed;
}

@Override
Expand Down Expand Up @@ -2117,15 +2125,18 @@ public Map<K, V> getAllPresent(Iterable<? extends K> keys) {
if ((node == null) || ((value = node.getValue()) == null) || hasExpired(node, now)) {
iter.remove();
} else {
entry.setValue(value);

if (!isComputingAsync(node)) {
@SuppressWarnings("unchecked")
K castedKey = (K) entry.getKey();
tryExpireAfterRead(node, castedKey, value, expiry(), now);
setAccessTime(node, now);
}
afterRead(node, now, /* recordHit */ false);
V refreshed = afterRead(node, now, /* recordHit */ false);
if (refreshed == null) {
entry.setValue(value);
} else {
entry.setValue(refreshed);
}
}
}
statsCounter().recordHits(result.size());
Expand Down Expand Up @@ -2489,9 +2500,8 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}

afterRead(node, now, /* recordHit */ recordStats);
return value;
var refreshed = afterRead(node, now, /* recordHit */ recordStats);
return (refreshed == null) ? value : refreshed;
}
}
if (recordStats) {
Expand Down
Loading

0 comments on commit cbc1cab

Please sign in to comment.