Skip to content

Commit

Permalink
Added snapshot view of in-flight refresh operations
Browse files Browse the repository at this point in the history
The ongoing refresh operations can now be observed by using
Policy.refreshes(). This does not provide a live view due to weak
keys needing to be unwrapped from reference key (and possibly
discarded if null). Instead an unmodifiable copy is provided for
inspection. The futures may be canceled, completed, or waited upon.
  • Loading branch information
ben-manes committed Feb 21, 2021
1 parent 0d23987 commit a95ae26
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -1508,8 +1509,7 @@ public void cleanUp() {
}

/**
* Performs the maintenance work, blocking until the lock is acquired. Any exception thrown, such
* as by {@link CacheWriter#delete}, is propagated to the caller.
* Performs the maintenance work, blocking until the lock is acquired.
*
* @param task an additional pending task to run, or {@code null} if not present
*/
Expand Down Expand Up @@ -3485,6 +3485,27 @@ static final class BoundedPolicy<K, V> implements Policy<K, V> {
}
return transformer.apply(node.getValue());
}
@Override public Map<K, CompletableFuture<V>> refreshes() {
var refreshes = cache.refreshes;
if ((refreshes == null) || refreshes.isEmpty()) {
return Map.of();
} if (cache.collectKeys()) {
var inFlight = new IdentityHashMap<K, CompletableFuture<V>>(refreshes.size());
for (var entry : refreshes.entrySet()) {
@SuppressWarnings("unchecked")
var key = ((InternalReference<K>) entry.getKey()).get();
@SuppressWarnings("unchecked")
var future = (CompletableFuture<V>) entry.getValue();
if (key != null) {
inFlight.put(key, future);
}
}
return Collections.unmodifiableMap(inFlight);
}
@SuppressWarnings("unchecked")
var castedRefreshes = (Map<K, CompletableFuture<V>>) (Object) refreshes;
return Map.copyOf(castedRefreshes);
}
@Override public Optional<Eviction<K, V>> eviction() {
return cache.evicts()
? (eviction == null) ? (eviction = Optional.of(new BoundedEviction())) : eviction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.checkerframework.checker.index.qual.NonNegative;
Expand Down Expand Up @@ -56,6 +57,13 @@ public interface Policy<K extends @NonNull Object, V extends @NonNull Object> {
@Nullable
V getIfPresentQuietly(K key);

/**
* Returns an unmodifiable snapshot {@link Map} view of the in-flight refresh operations.
*
* @return a snapshot view of the in-flight refresh operations
*/
Map<K, CompletableFuture<V>> refreshes();

/**
* Returns access to perform operations based on the maximum size or maximum weight eviction
* policy. If the cache was not constructed with a size-based bound or the implementation does
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,15 @@ static final class UnboundedPolicy<K, V> implements Policy<K, V> {
@Override public V getIfPresentQuietly(Object key) {
return transformer.apply(cache.data.get(key));
}
@Override public Map<K, CompletableFuture<V>> refreshes() {
var refreshes = cache.refreshes;
if (refreshes == null) {
return Map.of();
}
@SuppressWarnings("unchecked")
var castedRefreshes = (Map<K, CompletableFuture<V>>) (Object) refreshes;
return Map.copyOf(castedRefreshes);
}
@Override public Optional<Eviction<K, V>> eviction() {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -727,4 +728,20 @@ public void getIfPresentQuietly_present(Cache<Integer, Integer> cache, CacheCont
assertThat(cache.policy().getIfPresentQuietly(context.middleKey()), is(not(nullValue())));
assertThat(cache.policy().getIfPresentQuietly(context.lastKey()), is(not(nullValue())));
}

/* --------------- Policy: refreshes --------------- */

@CacheSpec
@CheckNoStats
@Test(dataProvider = "caches")
public void refreshes_empty(Cache<Integer, Integer> cache, CacheContext context) {
assertThat(cache.policy().refreshes(), is(anEmptyMap()));
}

@CacheSpec
@CheckNoStats
@Test(dataProvider = "caches", expectedExceptions = UnsupportedOperationException.class)
public void refreshes_unmodifiable(Cache<Integer, Integer> cache, CacheContext context) {
cache.policy().refreshes().clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.util.stream.Collectors.toMap;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty;
Expand Down Expand Up @@ -811,4 +812,22 @@ public void bulk_present() throws Exception {
assertThat(loader.loadAll(Set.of(1, 2)), is(Map.of(1, 1, 2, 2)));
assertThat(loader.load(1), is(1));
}

/* --------------- Policy: refreshes --------------- */

@Test(dataProvider = "caches")
@CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine)
public void refreshes(LoadingCache<Integer, Integer> cache, CacheContext context) {
var key1 = Iterables.get(context.absentKeys(), 0);
var key2 = context.original().isEmpty()
? Iterables.get(context.absentKeys(), 1)
: context.firstKey();
var future1 = cache.refresh(key1);
var future2 = cache.refresh(key2);
assertThat(cache.policy().refreshes(), is(equalTo(Map.of(key1, future1, key2, future2))));

future1.complete(1);
future2.cancel(true);
assertThat(cache.policy().refreshes(), is(anEmptyMap()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
Expand Down Expand Up @@ -373,7 +375,25 @@ public void refresh(LoadingCache<Integer, Integer> cache, CacheContext context)
future2.cancel(true);
}

/* --------------- Policy --------------- */
/* --------------- Policy: refreshes --------------- */

@Test(dataProvider = "caches")
@CacheSpec(loader = Loader.ASYNC_INCOMPLETE, implementation = Implementation.Caffeine,
refreshAfterWrite = Expire.ONE_MINUTE, population = Population.FULL)
public void refreshes(LoadingCache<Integer, Integer> cache, CacheContext context) {
context.ticker().advance(2, TimeUnit.MINUTES);
cache.getIfPresent(context.firstKey());
assertThat(cache.policy().refreshes(), is(aMapWithSize(1)));

var future = cache.policy().refreshes().get(context.firstKey());
assertThat(future, is(not(nullValue())));

future.complete(Integer.MAX_VALUE);
assertThat(cache.policy().refreshes(), is(anEmptyMap()));
assertThat(cache.getIfPresent(context.firstKey()), is(Integer.MAX_VALUE));
}

/* --------------- Policy: refreshAfterWrite --------------- */

@Test(dataProvider = "caches")
@CacheSpec(implementation = Implementation.Caffeine, refreshAfterWrite = Expire.ONE_MINUTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ public Policy<K, V> policy() {
@Override public V getIfPresentQuietly(K key) {
return cache.asMap().get(key);
}
@Override public Map<K, CompletableFuture<V>> refreshes() {
return Map.of();
}
@Override public Optional<Eviction<K, V>> eviction() {
return Optional.empty();
}
Expand Down

0 comments on commit a95ae26

Please sign in to comment.