Skip to content

Commit

Permalink
Refresh as a non-blocking write (fixes #56, fixes #69)
Browse files Browse the repository at this point in the history
Previously refresh was implemented as a computation performed on the
executor. Like all writes it allowed concurrent reads, but blocked
other writes like updating, invalidating, or evicting. This provided
strong consistency at the cost of lock granularity (hash bin) and
potentially wasting a thread in an asynchronous cache. This simple
model was also shown to be broken, as per the deadlock reported.

A refresh is now performed without blocking and better matches Guava's.
The newly loaded entry is dropped if the mapping now points to a
different value. Like Guava, if the entry disappears (e.g. eviction)
the loaded value is inserted. Usage through refreshAfterWrite is
preferred and it will try to avoid redundant in-flight loads. Unlike
Guava a LoadingCache#refresh() cannot detect and ignore redundant
loads. It may be possible to strengthen the implementation, but
explicit refreshes are rare. Similar to Guava, the approach is not ABA
safe but best effort and does what users would likely prefer. For
stricter reload behavior, users should perform a Map#compute instead.

Load testing uncovered a weighted eviction bug with a cache heavily
dominated by zero-weight entries (e.g. incomplete futures). The main
space eviction would find no victims and needed to fallback to scan
the admission window.

Thanks to everyone who helped in the discussions to wrap my head how
to properly implement this.
  • Loading branch information
ben-manes committed Apr 23, 2016
1 parent 1e6ec2d commit fa44d6b
Show file tree
Hide file tree
Showing 26 changed files with 561 additions and 280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ protected void execute() {
.returns(REMOVAL_LISTENER)
.build());
context.cache.addMethod(MethodSpec.methodBuilder("hasRemovalListener")
.addModifiers(protectedFinalModifiers)
.addModifiers(publicFinalModifiers)
.addStatement("return true")
.returns(boolean.class)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.util.concurrent.ConcurrentHashMap;

import org.apache.jackrabbit.oak.cache.CacheLIRS;
import org.cache2k.impl.ClockProPlusCache;
import org.cache2k.impl.LruCache;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.infinispan.commons.equivalence.AnyEquivalence;
import org.infinispan.commons.util.concurrent.jdk8backported.BoundedEquivalentConcurrentHashMapV8;
Expand Down Expand Up @@ -70,14 +68,9 @@ public enum CacheType {

/* ---------------- Bounded -------------- */

Cache2k_ClockProPlus {
Cache2k {
@Override public <K, V> BasicCache<K, V> create(int maximumSize) {
return new Cache2k<>(ClockProPlusCache.class, maximumSize);
}
},
Cache2k_Lru {
@Override public <K, V> BasicCache<K, V> create(int maximumSize) {
return new Cache2k<>(LruCache.class, maximumSize);
return new Cache2k<>(maximumSize);
}
},
Caffeine {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package com.github.benmanes.caffeine.cache.impl;

import org.cache2k.Cache;
import org.cache2k.CacheBuilder;
import org.cache2k.Cache2kBuilder;

import com.github.benmanes.caffeine.cache.BasicCache;

Expand All @@ -26,10 +26,9 @@
public final class Cache2k<K, V> implements BasicCache<K, V> {
private final Cache<K, V> cache;

@SuppressWarnings({"unchecked", "deprecation"})
public Cache2k(Class<?> implementation, int maximumSize) {
cache = (Cache<K, V>) CacheBuilder.newCache(Object.class, Object.class)
.implementation(implementation)
@SuppressWarnings("unchecked")
public Cache2k(int maximumSize) {
cache = (Cache<K, V>) Cache2kBuilder.forUnknownTypes()
.entryCapacity(maximumSize)
.eternal(true)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package com.github.benmanes.caffeine.cache;

import static com.github.benmanes.caffeine.cache.Caffeine.requireState;
import static com.github.benmanes.caffeine.cache.Node.EDEN;
import static com.github.benmanes.caffeine.cache.Node.PROBATION;
import static com.github.benmanes.caffeine.cache.Node.PROTECTED;
import static java.util.Objects.requireNonNull;

import java.io.InvalidObjectException;
Expand Down Expand Up @@ -266,13 +269,13 @@ public RemovalListener<K, V> removalListener() {
return null;
}

/** Returns whether this cache notifies when an entry is removed. */
protected boolean hasRemovalListener() {
@Override
public boolean hasRemovalListener() {
return false;
}

/** Asynchronously sends a removal notification to the listener. */
void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause) {
@Override
public void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause) {
requireState(hasRemovalListener(), "Notification should be guarded with a check");
Runnable task = () -> {
try {
Expand Down Expand Up @@ -355,6 +358,11 @@ protected void setRefreshAfterWriteNanos(long refreshAfterWriteNanos) {
throw new UnsupportedOperationException();
}

@Override
public boolean hasWriteTime() {
return expiresAfterWrite() || refreshAfterWrite();
}

@Override
public Ticker expirationTicker() {
return Ticker.disabledTicker();
Expand Down Expand Up @@ -524,6 +532,7 @@ int evictFromEden() {
*/
@GuardedBy("evictionLock")
void evictFromMain(int candidates) {
int victimQueue = PROBATION;
Node<K, V> victim = accessOrderProbationDeque().peekFirst();
Node<K, V> candidate = accessOrderProbationDeque().peekLast();
while (weightedSize() > maximum()) {
Expand All @@ -532,14 +541,20 @@ void evictFromMain(int candidates) {
candidate = null;
}

// Start evicting from the protected queue
// Try evicting from the protected and eden queues
if ((candidate == null) && (victim == null)) {
victim = accessOrderProtectedDeque().peekFirst();
if (victimQueue == PROBATION) {
victim = accessOrderProtectedDeque().peekFirst();
victimQueue = PROTECTED;
continue;
} else if (victimQueue == PROTECTED) {
victim = accessOrderEdenDeque().peekFirst();
victimQueue = EDEN;
continue;
}

// The pending operations will adjust the size to reflect the correct weight
if (victim == null) {
break;
}
break;
}

// Skip over entries with zero weight
Expand Down Expand Up @@ -728,7 +743,11 @@ void evictEntry(Node<K, V> node, RemovalCause cause, long now) {
return n;
}
} else if (actualCause == RemovalCause.SIZE) {
if (node.getWeight() == 0) {
int weight;
synchronized (node) {
weight = node.getWeight();
}
if (weight == 0) {
resurrect[0] = true;
return n;
}
Expand Down Expand Up @@ -805,45 +824,63 @@ void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}
K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = isAsync ? Long.MAX_VALUE : now;
if (((now - oldWriteTime) > refreshAfterWriteNanos())
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
executor().execute(() -> {
K key = node.getKey();
if ((key != null) && node.isAlive()) {
BiFunction<? super K, ? super V, ? extends V> refreshFunction = (k, v) -> {
if (node.getWriteTime() != refreshWriteTime) {
return v;
}
try {
if (isAsync) {
@SuppressWarnings("unchecked")
V oldValue = ((CompletableFuture<V>) v).join();
CompletableFuture<V> future =
cacheLoader.asyncReload(key, oldValue, Runnable::run);
if (future.join() == null) {
return null;
}
@SuppressWarnings("unchecked")
V castFuture = (V) future;
return castFuture;
}
return cacheLoader.reload(k, v);
} catch (Exception e) {
node.setWriteTime(oldWriteTime);
return LocalCache.throwUnchecked(e);
}
};
try {
computeIfPresent(key, refreshFunction);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown during refresh", t);
CompletableFuture<V> refreshFuture;
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
refreshFuture = future.thenCompose(value ->
cacheLoader.asyncReload(key, value, executor));
} else {
// no-op if load is pending
node.casWriteTime(refreshWriteTime, oldWriteTime);
return;
}
} else {
refreshFuture = cacheLoader.asyncReload(key, oldValue, executor);
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - now;
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
node.casWriteTime(refreshWriteTime, oldWriteTime);
statsCounter().recordLoadFailure(loadTime);
return;
}

@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;

boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
return value;
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false);

if (discard[0] && hasRemovalListener()) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
}
});
} catch (Throwable t) {
node.casWriteTime(refreshWriteTime, oldWriteTime);
logger.log(Level.SEVERE, "Exception thrown when submitting refresh task", t);
}
}
Expand Down Expand Up @@ -1372,6 +1409,18 @@ public V getIfPresent(Object key, boolean recordStats) {
return node.getValue();
}

@Override
public V getIfPresentQuietly(Object key, long[/* 1 */] writeTime) {
V value;
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if ((node == null) || ((value = node.getValue()) == null)
|| hasExpired(node, expirationTicker().read())) {
return null;
}
writeTime[0] = node.getWriteTime();
return value;
}

@Override
public Map<K, V> getAllPresent(Iterable<?> keys) {
int misses = 0;
Expand Down Expand Up @@ -1401,24 +1450,24 @@ public Map<K, V> getAllPresent(Iterable<?> keys) {
public V put(K key, V value) {
int weight = weigher.weigh(key, value);
return (weight > 0)
? putFast(key, value, weight, true, false)
: putSlow(key, value, weight, true, false);
? putFast(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ false)
: putSlow(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ false);
}

@Override
public V put(K key, V value, boolean notifyWriter) {
int weight = weigher.weigh(key, value);
return (weight > 0)
? putFast(key, value, weight, notifyWriter, false)
: putSlow(key, value, weight, notifyWriter, false);
? putFast(key, value, weight, notifyWriter, /* onlyIfAbsent */ false)
: putSlow(key, value, weight, notifyWriter, /* onlyIfAbsent */ false);
}

@Override
public V putIfAbsent(K key, V value) {
int weight = weigher.weigh(key, value);
return (weight > 0)
? putFast(key, value, weight, true, true)
: putSlow(key, value, weight, true, true);
? putFast(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ true)
: putSlow(key, value, weight, /* notifyWriter */ true, /* onlyIfAbsent */ true);
}

/**
Expand Down Expand Up @@ -1823,7 +1872,7 @@ public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {

@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction,
boolean isAsync) {
boolean recordStats, boolean recordLoad) {
requireNonNull(key);
requireNonNull(mappingFunction);
long now = expirationTicker().read();
Expand All @@ -1837,13 +1886,16 @@ public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction
return value;
}
}
if (recordStats) {
mappingFunction = statsAware(mappingFunction, recordLoad);
}
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
return doComputeIfAbsent(key, keyRef, mappingFunction, isAsync, now);
return doComputeIfAbsent(key, keyRef, mappingFunction, now);
}

/** Returns the current value from a computeIfAbsent invocation. */
V doComputeIfAbsent(K key, Object keyRef, Function<? super K, ? extends V> mappingFunction,
boolean isAsync, long now) {
V doComputeIfAbsent(K key, Object keyRef,
Function<? super K, ? extends V> mappingFunction, long now) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
Expand All @@ -1857,7 +1909,7 @@ V doComputeIfAbsent(K key, Object keyRef, Function<? super K, ? extends V> mappi
RemovalCause[] cause = new RemovalCause[1];
Node<K, V> node = data.compute(keyRef, (k, n) -> {
if (n == null) {
newValue[0] = statsAware(mappingFunction, isAsync).apply(key);
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
return null;
}
Expand All @@ -1881,7 +1933,7 @@ V doComputeIfAbsent(K key, Object keyRef, Function<? super K, ? extends V> mappi
}

writer.delete(nodeKey[0], oldValue[0], cause[0]);
newValue[0] = statsAware(mappingFunction, isAsync).apply(key);
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
removed[0] = n;
n.retire();
Expand Down Expand Up @@ -1938,21 +1990,21 @@ public V computeIfPresent(K key,

boolean computeIfAbsent = false;
BiFunction<? super K, ? super V, ? extends V> statsAwareRemappingFunction =
statsAware(remappingFunction, false, false);
statsAware(remappingFunction, /* recordMiss */ false, /* recordLoad */ true);
return remap(key, lookupKey, statsAwareRemappingFunction, now, computeIfAbsent);
}

@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction,
boolean recordMiss, boolean isAsync) {
boolean recordMiss, boolean recordLoad) {
requireNonNull(key);
requireNonNull(remappingFunction);

long now = expirationTicker().read();
boolean computeIfAbsent = true;
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
BiFunction<? super K, ? super V, ? extends V> statsAwareRemappingFunction =
statsAware(remappingFunction, recordMiss, isAsync);
statsAware(remappingFunction, recordMiss, recordLoad);
return remap(key, keyRef, statsAwareRemappingFunction, now, computeIfAbsent);
}

Expand Down
Loading

0 comments on commit fa44d6b

Please sign in to comment.