From be754c5faa8de0676677dcdea8153318461b95bd Mon Sep 17 00:00:00 2001 From: Ben Manes Date: Fri, 5 May 2017 01:42:09 -0700 Subject: [PATCH] Added ability to inspect the variable expiration policy Supporting a snapshot view of the entries in expiration order required adding an overflow bucket. Previously overflow was scheduled on the furthest away bucket, but this meant order could be less exact when traversing. Note that order within a bucket is not computed so the view is only a best guess. --- .../caffeine/cache/BoundedLocalCache.java | 114 ++++++++++++------ .../benmanes/caffeine/cache/TimerWheel.java | 76 ++++++++++-- .../caffeine/cache/TimerWheelTest.java | 72 ++++++++--- .../caffeine/cache/testing/CacheContext.java | 5 +- gradle/dependencies.gradle | 2 +- 5 files changed, 199 insertions(+), 70 deletions(-) diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index f0f9d6876c..dacc27b63f 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -867,7 +867,7 @@ void refreshIfNeeded(Node node, long now) { K key; V oldValue; long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = isAsync ? Long.MAX_VALUE : now; + long refreshWriteTime = isAsync ? (now + Async.MAXIMUM_EXPIRY) : now; if (((now - oldWriteTime) > refreshAfterWriteNanos()) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) && node.casWriteTime(oldWriteTime, refreshWriteTime)) { @@ -1331,18 +1331,15 @@ public void run() { } } - // Ensure that in-flight async computation cannot expire + // Ensure that in-flight async computation cannot expire (reset on a completion callback) if (isComputingAsync(node)) { - CompletableFuture future = (CompletableFuture) node.getValue(); - if (future != null) { - node.setVariableTime(Long.MAX_VALUE); - setAccessTime(node, Long.MAX_VALUE); - setWriteTime(node, Long.MAX_VALUE); - future.thenRun(() -> { - long now = expirationTicker().read(); - setAccessTime(node, now); - setWriteTime(node, now); - }); + synchronized (node) { + if (!Async.isReady((CompletableFuture) node.getValue())) { + long expirationTime = expirationTicker().read() + Async.MAXIMUM_EXPIRY; + setWriteTime(node, expirationTime); + setAccessTime(node, expirationTime); + setVariableTime(node, expirationTime); + } } } } @@ -1548,9 +1545,11 @@ public V getIfPresent(Object key, boolean recordStats) { @SuppressWarnings("unchecked") K castedKey = (K) key; V value = node.getValue(); - setVariableTime(node, expireAfterRead(node, castedKey, value, now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, castedKey, value, now)); + setAccessTime(node, now); + } afterRead(node, now, recordStats); return value; } @@ -1590,8 +1589,10 @@ public Map getAllPresent(Iterable keys) { K castedKey = (K) entry.getKey(); @SuppressWarnings("unchecked") V castedValue = (V) entry.getValue(); - setVariableTime(node, expireAfterRead(node, castedKey, castedValue, now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, castedKey, castedValue, now)); + setAccessTime(node, now); + } afterRead(node, now, /* recordHit */ false); } @@ -2006,8 +2007,10 @@ public V computeIfAbsent(K key, Function mappingFunction if (node != null) { V value = node.getValue(); if ((value != null) && !hasExpired(node, now)) { - setVariableTime(node, expireAfterRead(node, key, value, now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, key, value, now)); + setAccessTime(node, now); + } afterRead(node, now, /* recordHit */ true); return value; @@ -2092,8 +2095,10 @@ V doComputeIfAbsent(K key, Object keyRef, statsCounter().recordEviction(weight[0]); } if (newValue[0] == null) { - setVariableTime(node, expireAfterRead(node, key, oldValue[0], now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, key, oldValue[0], now)); + setAccessTime(node, now); + } afterRead(node, now, /* recordHit */ true); return oldValue[0]; @@ -2269,8 +2274,10 @@ V remap(K key, Object keyRef, BiFunction rema afterWrite(node, new UpdateTask(node, weightedDifference), now); } else { if (cause[0] == null) { - setVariableTime(node, expireAfterRead(node, key, newValue[0], now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, key, newValue[0], now)); + setAccessTime(node, now); + } } else if (cause[0] == RemovalCause.COLLECTED) { scheduleDrainBuffers(); } @@ -2327,7 +2334,7 @@ Map evictionOrder(int limit, Function transformer, boolean hottest) return PeekingIterator.concat(primary, accessOrderProtectedDeque().iterator()); } }; - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } /** @@ -2345,7 +2352,7 @@ Map expireAfterAcessOrder(int limit, Function transformer, boolean o Supplier>> iteratorSupplier = () -> oldest ? accessOrderEdenDeque().iterator() : accessOrderEdenDeque().descendingIterator(); - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } Supplier>> iteratorSupplier = () -> { @@ -2364,7 +2371,7 @@ Map expireAfterAcessOrder(int limit, Function transformer, boolean o return PeekingIterator.comparing( PeekingIterator.comparing(first, second, comparator), third, comparator); }; - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } /** @@ -2381,7 +2388,7 @@ Map expireAfterWriteOrder(int limit, Function transformer, boolean o Supplier>> iteratorSupplier = () -> oldest ? writeOrderDeque().iterator() : writeOrderDeque().descendingIterator(); - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } /** @@ -2393,23 +2400,22 @@ Map expireAfterWriteOrder(int limit, Function transformer, boolean o * @param transformer a function that unwraps the value * @return an unmodifiable snapshot in the iterator's order */ - Map snapshot(Supplier>> iteratorSupplier, - Function transformer, int limit) { + Map fixedSnapshot(Supplier>> iteratorSupplier, + int limit, Function transformer) { requireArgument(limit >= 0); evictionLock.lock(); try { maintenance(/* ignored */ null); - int initialCapacity = - isWeighted() ? 16 : Math.min(limit, evicts() ? (int) adjustedWeightedSize() : size()); + int initialCapacity = Math.min(limit, size()); Iterator> iterator = iteratorSupplier.get(); Map map = new LinkedHashMap<>(initialCapacity); while ((map.size() < limit) && iterator.hasNext()) { Node node = iterator.next(); K key = node.getKey(); - V value = node.getValue(); + V value = transformer.apply(node.getValue()); if ((key != null) && (value != null) && node.isAlive()) { - map.put(key, transformer.apply(value)); + map.put(key, value); } } return Collections.unmodifiableMap(map); @@ -2418,6 +2424,26 @@ Map snapshot(Supplier>> iteratorSupplier, } } + /** + * Returns an unmodifiable snapshot map roughly ordered by the expiration time. The wheels are + * evaluated in order, but the timers that fall within the bucket's range are not sorted. Beware + * that obtaining the mappings is NOT a constant-time operation. + * + * @param ascending the direction + * @param limit the maximum number of entries + * @param transformer a function that unwraps the value + * @return an unmodifiable snapshot in the desired order + */ + Map variableSnapshot(boolean ascending, int limit, Function transformer) { + evictionLock.lock(); + try { + maintenance(/* ignored */ null); + return timerWheel().snapshot(ascending, limit, transformer); + } finally { + evictionLock.unlock(); + } + } + /** An adapter to safely externalize the keys. */ static final class KeySetView extends AbstractSet { final BoundedLocalCache cache; @@ -3040,6 +3066,7 @@ static final class BoundedPolicy implements Policy { Optional> refreshes; Optional> afterWrite; Optional> afterAccess; + Optional> variable; BoundedPolicy(BoundedLocalCache cache, Function transformer, boolean isWeighted) { this.transformer = transformer; @@ -3071,8 +3098,15 @@ static final class BoundedPolicy implements Policy { ? (afterWrite = Optional.of(new BoundedExpireAfterWrite())) : afterWrite; } - @Override - public Optional> refreshAfterWrite() { + @Override public Optional> expireVariably() { + if (!cache.expiresVariable()) { + return Optional.empty(); + } + return (variable == null) + ? (variable = Optional.of(new BoundedVarExpiration())) + : variable; + } + @Override public Optional> refreshAfterWrite() { if (!cache.refreshAfterWrite()) { return Optional.empty(); } @@ -3189,7 +3223,7 @@ final class BoundedExpireAfterWrite implements Expiration { } } - final class BoundedExpireVariably implements VarExpiration { + final class BoundedVarExpiration implements VarExpiration { @Override public OptionalLong getExpiresAfter(K key, TimeUnit unit) { requireNonNull(key); requireNonNull(unit); @@ -3212,11 +3246,11 @@ final class BoundedExpireVariably implements VarExpiration { node.setVariableTime(unit.convert(duration, TimeUnit.NANOSECONDS)); } } - @Override public Map oldest(int limit) { - throw new UnsupportedOperationException(); - } @Override public Map youngest(int limit) { - throw new UnsupportedOperationException(); + return cache.variableSnapshot(/* ascending */ true, limit, transformer); + } + @Override public Map oldest(int limit) { + return cache.variableSnapshot(/* ascending */ false, limit, transformer); } } @@ -3258,7 +3292,7 @@ Map sortedByWriteTime(int limit, boolean ascending) { Comparator> comparator = Comparator.comparingLong(Node::getWriteTime); Iterator> iterator = cache.data.values().stream().parallel().sorted( ascending ? comparator : comparator.reversed()).limit(limit).iterator(); - return cache.snapshot(() -> iterator, transformer, limit); + return cache.fixedSnapshot(() -> iterator, limit, transformer); } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java index 5941191cfb..68402eb017 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java @@ -15,12 +15,18 @@ */ package com.github.benmanes.caffeine.cache; +import static com.github.benmanes.caffeine.cache.Caffeine.requireArgument; import static java.util.Objects.requireNonNull; import java.lang.ref.ReferenceQueue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -47,19 +53,21 @@ final class TimerWheel { * http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf */ - static final int[] BUCKETS = { 64, 64, 32, 4 }; + static final int[] BUCKETS = { 64, 64, 32, 4, 1 }; static final long[] SPANS = { ceilingPowerOfTwo(TimeUnit.SECONDS.toNanos(1)), // 1.07s ceilingPowerOfTwo(TimeUnit.MINUTES.toNanos(1)), // 1.14m ceilingPowerOfTwo(TimeUnit.HOURS.toNanos(1)), // 1.22h ceilingPowerOfTwo(TimeUnit.DAYS.toNanos(1)), // 1.63d BUCKETS[3] * ceilingPowerOfTwo(TimeUnit.DAYS.toNanos(1)), // 6.5d + BUCKETS[3] * ceilingPowerOfTwo(TimeUnit.DAYS.toNanos(1)), // 6.5d }; static final long[] SHIFT = { Long.SIZE - Long.numberOfLeadingZeros(SPANS[0] - 1), Long.SIZE - Long.numberOfLeadingZeros(SPANS[1] - 1), Long.SIZE - Long.numberOfLeadingZeros(SPANS[2] - 1), Long.SIZE - Long.numberOfLeadingZeros(SPANS[3] - 1), + Long.SIZE - Long.numberOfLeadingZeros(SPANS[4] - 1), }; final BoundedLocalCache cache; @@ -198,20 +206,15 @@ public void deschedule(Node node) { */ Node findBucket(long time) { long duration = time - nanos; - for (int i = 0; i < wheel.length; i++) { + int length = wheel.length - 1; + for (int i = 0; i < length; i++) { if (duration < SPANS[i + 1]) { int ticks = (int) (time >> SHIFT[i]); int index = ticks & (wheel[i].length - 1); return wheel[i][index]; } } - - // Add to the last timer bucket - int lastWheel = wheel.length - 1; - int buckets = wheel[lastWheel].length - 1; - int ticks = (int) (nanos >> SHIFT[lastWheel]) - 1; - int index = (ticks & buckets); - return wheel[lastWheel][index]; + return wheel[length][0]; } /** Adds the entry at the tail of the bucket's list. */ @@ -233,18 +236,65 @@ void unlink(Node node) { } } + /** + * Returns an unmodifiable snapshot map roughly ordered by the expiration time. The wheels are + * evaluated in order, but the timers that fall within the bucket's range are not sorted. Beware + * that obtaining the mappings is NOT a constant-time operation. + * + * @param ascending the direction + * @param limit the maximum number of entries + * @param transformer a function that unwraps the value + * @return an unmodifiable snapshot in the desired order + */ + public Map snapshot(boolean ascending, int limit, Function transformer) { + requireArgument(limit >= 0); + + Map map = new LinkedHashMap<>(Math.min(limit, cache.size())); + int startLevel = ascending ? 0 : wheel.length - 1; + for (int i = 0; i < wheel.length; i++) { + int indexOffset = ascending ? i : -i; + int index = startLevel + indexOffset; + + int ticks = (int) (nanos >> SHIFT[index]); + int bucketMask = (wheel[index].length - 1); + int startBucket = (ticks & bucketMask) + (ascending ? 1 : 0); + for (int j = 0; j < wheel[index].length; j++) { + int bucketOffset = ascending ? j : -j; + Node sentinel = wheel[index][(startBucket + bucketOffset) & bucketMask]; + + for (Node node = traverse(ascending, sentinel); + node != sentinel; node = traverse(ascending, node)) { + if (map.size() >= limit) { + break; + } + + K key = node.getKey(); + V value = transformer.apply(node.getValue()); + if ((key != null) && (value != null) && node.isAlive()) { + map.put(key, value); + } + } + } + } + return Collections.unmodifiableMap(map); + } + + static Node traverse(boolean ascending, Node node) { + return ascending ? node.getNextInVariableOrder() : node.getPreviousInVariableOrder(); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < wheel.length; i++) { - Map buckets = new TreeMap<>(); + Map> buckets = new TreeMap<>(); for (int j = 0; j < wheel[i].length; j++) { - int events = 0; + List events = new ArrayList<>(); for (Node node = wheel[i][j].getNextInVariableOrder(); node != wheel[i][j]; node = node.getNextInVariableOrder()) { - events++; + events.add(node.getKey()); } - if (events > 0) { + if (!events.isEmpty()) { buckets.put(j, events); } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java index f48e12f2c1..2b101e1128 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java @@ -15,6 +15,7 @@ */ package com.github.benmanes.caffeine.cache; +import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -28,21 +29,28 @@ import java.lang.ref.ReferenceQueue; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; import javax.annotation.Nullable; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; + import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; @@ -51,9 +59,9 @@ */ @Test(singleThreaded = true) public final class TimerWheelTest { - TimerWheel timerWheel; - @Mock BoundedLocalCache cache; - @Captor ArgumentCaptor> captor; + TimerWheel timerWheel; + @Mock BoundedLocalCache cache; + @Captor ArgumentCaptor> captor; @BeforeMethod public void beforeMethod() { @@ -159,7 +167,7 @@ public void reschedule() { private void checkEmpty() { for (int i = 0; i < timerWheel.wheel.length; i++) { for (int j = 0; j < timerWheel.wheel[i].length; j++) { - Node sentinel = timerWheel.wheel[i][j]; + Node sentinel = timerWheel.wheel[i][j]; assertThat(sentinel.getNextInVariableOrder(), is(sentinel)); assertThat(sentinel.getPreviousInVariableOrder(), is(sentinel)); } @@ -238,9 +246,43 @@ public Iterator providesCascade() { return args.iterator(); } - private static final class Timer implements Node { - Node prev; - Node next; + @Test(dataProvider = "snapshot") + public void snapshot(boolean ascending, int limit, long nanos, Function transformer) { + int count = 21; + timerWheel.nanos = nanos; + int expected = Math.min(limit, count); + Comparator order = ascending ? Comparator.naturalOrder() : Comparator.reverseOrder(); + List times = IntStream.range(0, count).mapToLong(i -> { + long time = nanos + TimeUnit.SECONDS.toNanos(2 << i); + timerWheel.schedule(new Timer(time)); + return time; + }).boxed().sorted(order).collect(toList()).subList(0, expected); + + when(transformer.apply(anyLong())).thenAnswer(invocation -> invocation.getArgument(0)); + assertThat(snapshot(ascending, limit, transformer), is(times)); + verify(transformer, times(expected)).apply(anyLong()); + } + + private List snapshot(boolean ascending, int limit, Function transformer) { + return ImmutableList.copyOf(timerWheel.snapshot(ascending, limit, transformer).keySet()); + } + + @DataProvider(name="snapshot") + public Iterator providesSnaphot() { + List scenarios = new ArrayList<>(); + for (long nanos : new long[] {0L, System.nanoTime() }) { + for (int limit : new int[] { 10, 100 }) { + scenarios.addAll(Arrays.asList( + new Object[] { /* ascending */ true, limit, nanos, Mockito.mock(Function.class) }, + new Object[] { /* ascending */ false, limit, nanos, Mockito.mock(Function.class) })); + } + } + return scenarios.iterator(); + } + + private static final class Timer implements Node { + Node prev; + Node next; long variableTime; Timer(long accessTime) { @@ -253,26 +295,26 @@ private static final class Timer implements Node { @Override public void setVariableTime(long variableTime) { this.variableTime = variableTime; } - @Override public Node getPreviousInVariableOrder() { + @Override public Node getPreviousInVariableOrder() { return prev; } - @Override public void setPreviousInVariableOrder(@Nullable Node prev) { + @Override public void setPreviousInVariableOrder(@Nullable Node prev) { this.prev = prev; } - @Override public Node getNextInVariableOrder() { + @Override public Node getNextInVariableOrder() { return next; } - @Override public void setNextInVariableOrder(@Nullable Node next) { + @Override public void setNextInVariableOrder(@Nullable Node next) { this.next = next; } - @Override public Integer getKey() { return null; } + @Override public Long getKey() { return variableTime; } @Override public Object getKeyReference() { return null; } - @Override public Integer getValue() { return null; } + @Override public Long getValue() { return variableTime; } @Override public Object getValueReference() { return null; } - @Override public void setValue(Integer value, ReferenceQueue referenceQueue) {} + @Override public void setValue(Long value, ReferenceQueue referenceQueue) {} @Override public boolean containsValue(Object value) { return false; } - @Override public boolean isAlive() { return false; } + @Override public boolean isAlive() { return true; } @Override public boolean isRetired() { return false; } @Override public boolean isDead() { return false; } @Override public void retire() {} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java index 482fd64353..b229830c50 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -73,6 +74,8 @@ * @author ben.manes@gmail.com (Ben Manes) */ public final class CacheContext { + private static final long START_TIME = System.nanoTime() - TimeUnit.DAYS.toNanos(1); + final RemovalListener removalListener; final CacheWriter cacheWriter; final InitialCapacity initialCapacity; @@ -142,7 +145,7 @@ public CacheContext(InitialCapacity initialCapacity, Stats stats, CacheWeigher w this.isAsyncLoading = isAsyncLoading; this.writer = requireNonNull(writer); this.cacheWriter = writer.create(); - this.ticker = new SerializableFakeTicker(); + this.ticker = new SerializableFakeTicker().advance(START_TIME); this.implementation = requireNonNull(implementation); this.original = new LinkedHashMap<>(); this.initialSize = -1; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 807c4ffd92..e64427505c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -60,7 +60,7 @@ ext { concurrentlinkedhashmap: '1.4.2', ehcache2: '2.10.4', ehcache3: '3.3.1', - elastic_search: '5.3.2', + elastic_search: '5.4.0', expiring_map: '0.5.8', jackrabbit: '1.6.1', jamm: '0.3.1',