diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index f0f9d6876c..dacc27b63f 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -867,7 +867,7 @@ void refreshIfNeeded(Node node, long now) { K key; V oldValue; long oldWriteTime = node.getWriteTime(); - long refreshWriteTime = isAsync ? Long.MAX_VALUE : now; + long refreshWriteTime = isAsync ? (now + Async.MAXIMUM_EXPIRY) : now; if (((now - oldWriteTime) > refreshAfterWriteNanos()) && ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null) && node.casWriteTime(oldWriteTime, refreshWriteTime)) { @@ -1331,18 +1331,15 @@ public void run() { } } - // Ensure that in-flight async computation cannot expire + // Ensure that in-flight async computation cannot expire (reset on a completion callback) if (isComputingAsync(node)) { - CompletableFuture future = (CompletableFuture) node.getValue(); - if (future != null) { - node.setVariableTime(Long.MAX_VALUE); - setAccessTime(node, Long.MAX_VALUE); - setWriteTime(node, Long.MAX_VALUE); - future.thenRun(() -> { - long now = expirationTicker().read(); - setAccessTime(node, now); - setWriteTime(node, now); - }); + synchronized (node) { + if (!Async.isReady((CompletableFuture) node.getValue())) { + long expirationTime = expirationTicker().read() + Async.MAXIMUM_EXPIRY; + setWriteTime(node, expirationTime); + setAccessTime(node, expirationTime); + setVariableTime(node, expirationTime); + } } } } @@ -1548,9 +1545,11 @@ public V getIfPresent(Object key, boolean recordStats) { @SuppressWarnings("unchecked") K castedKey = (K) key; V value = node.getValue(); - setVariableTime(node, expireAfterRead(node, castedKey, value, now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, castedKey, value, now)); + setAccessTime(node, now); + } afterRead(node, now, recordStats); return value; } @@ -1590,8 +1589,10 @@ public Map getAllPresent(Iterable keys) { K castedKey = (K) entry.getKey(); @SuppressWarnings("unchecked") V castedValue = (V) entry.getValue(); - setVariableTime(node, expireAfterRead(node, castedKey, castedValue, now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, castedKey, castedValue, now)); + setAccessTime(node, now); + } afterRead(node, now, /* recordHit */ false); } @@ -2006,8 +2007,10 @@ public V computeIfAbsent(K key, Function mappingFunction if (node != null) { V value = node.getValue(); if ((value != null) && !hasExpired(node, now)) { - setVariableTime(node, expireAfterRead(node, key, value, now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, key, value, now)); + setAccessTime(node, now); + } afterRead(node, now, /* recordHit */ true); return value; @@ -2092,8 +2095,10 @@ V doComputeIfAbsent(K key, Object keyRef, statsCounter().recordEviction(weight[0]); } if (newValue[0] == null) { - setVariableTime(node, expireAfterRead(node, key, oldValue[0], now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, key, oldValue[0], now)); + setAccessTime(node, now); + } afterRead(node, now, /* recordHit */ true); return oldValue[0]; @@ -2269,8 +2274,10 @@ V remap(K key, Object keyRef, BiFunction rema afterWrite(node, new UpdateTask(node, weightedDifference), now); } else { if (cause[0] == null) { - setVariableTime(node, expireAfterRead(node, key, newValue[0], now)); - setAccessTime(node, now); + if (!isComputingAsync(node)) { + setVariableTime(node, expireAfterRead(node, key, newValue[0], now)); + setAccessTime(node, now); + } } else if (cause[0] == RemovalCause.COLLECTED) { scheduleDrainBuffers(); } @@ -2327,7 +2334,7 @@ Map evictionOrder(int limit, Function transformer, boolean hottest) return PeekingIterator.concat(primary, accessOrderProtectedDeque().iterator()); } }; - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } /** @@ -2345,7 +2352,7 @@ Map expireAfterAcessOrder(int limit, Function transformer, boolean o Supplier>> iteratorSupplier = () -> oldest ? accessOrderEdenDeque().iterator() : accessOrderEdenDeque().descendingIterator(); - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } Supplier>> iteratorSupplier = () -> { @@ -2364,7 +2371,7 @@ Map expireAfterAcessOrder(int limit, Function transformer, boolean o return PeekingIterator.comparing( PeekingIterator.comparing(first, second, comparator), third, comparator); }; - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } /** @@ -2381,7 +2388,7 @@ Map expireAfterWriteOrder(int limit, Function transformer, boolean o Supplier>> iteratorSupplier = () -> oldest ? writeOrderDeque().iterator() : writeOrderDeque().descendingIterator(); - return snapshot(iteratorSupplier, transformer, limit); + return fixedSnapshot(iteratorSupplier, limit, transformer); } /** @@ -2393,23 +2400,22 @@ Map expireAfterWriteOrder(int limit, Function transformer, boolean o * @param transformer a function that unwraps the value * @return an unmodifiable snapshot in the iterator's order */ - Map snapshot(Supplier>> iteratorSupplier, - Function transformer, int limit) { + Map fixedSnapshot(Supplier>> iteratorSupplier, + int limit, Function transformer) { requireArgument(limit >= 0); evictionLock.lock(); try { maintenance(/* ignored */ null); - int initialCapacity = - isWeighted() ? 16 : Math.min(limit, evicts() ? (int) adjustedWeightedSize() : size()); + int initialCapacity = Math.min(limit, size()); Iterator> iterator = iteratorSupplier.get(); Map map = new LinkedHashMap<>(initialCapacity); while ((map.size() < limit) && iterator.hasNext()) { Node node = iterator.next(); K key = node.getKey(); - V value = node.getValue(); + V value = transformer.apply(node.getValue()); if ((key != null) && (value != null) && node.isAlive()) { - map.put(key, transformer.apply(value)); + map.put(key, value); } } return Collections.unmodifiableMap(map); @@ -2418,6 +2424,26 @@ Map snapshot(Supplier>> iteratorSupplier, } } + /** + * Returns an unmodifiable snapshot map roughly ordered by the expiration time. The wheels are + * evaluated in order, but the timers that fall within the bucket's range are not sorted. Beware + * that obtaining the mappings is NOT a constant-time operation. + * + * @param ascending the direction + * @param limit the maximum number of entries + * @param transformer a function that unwraps the value + * @return an unmodifiable snapshot in the desired order + */ + Map variableSnapshot(boolean ascending, int limit, Function transformer) { + evictionLock.lock(); + try { + maintenance(/* ignored */ null); + return timerWheel().snapshot(ascending, limit, transformer); + } finally { + evictionLock.unlock(); + } + } + /** An adapter to safely externalize the keys. */ static final class KeySetView extends AbstractSet { final BoundedLocalCache cache; @@ -3040,6 +3066,7 @@ static final class BoundedPolicy implements Policy { Optional> refreshes; Optional> afterWrite; Optional> afterAccess; + Optional> variable; BoundedPolicy(BoundedLocalCache cache, Function transformer, boolean isWeighted) { this.transformer = transformer; @@ -3071,8 +3098,15 @@ static final class BoundedPolicy implements Policy { ? (afterWrite = Optional.of(new BoundedExpireAfterWrite())) : afterWrite; } - @Override - public Optional> refreshAfterWrite() { + @Override public Optional> expireVariably() { + if (!cache.expiresVariable()) { + return Optional.empty(); + } + return (variable == null) + ? (variable = Optional.of(new BoundedVarExpiration())) + : variable; + } + @Override public Optional> refreshAfterWrite() { if (!cache.refreshAfterWrite()) { return Optional.empty(); } @@ -3189,7 +3223,7 @@ final class BoundedExpireAfterWrite implements Expiration { } } - final class BoundedExpireVariably implements VarExpiration { + final class BoundedVarExpiration implements VarExpiration { @Override public OptionalLong getExpiresAfter(K key, TimeUnit unit) { requireNonNull(key); requireNonNull(unit); @@ -3212,11 +3246,11 @@ final class BoundedExpireVariably implements VarExpiration { node.setVariableTime(unit.convert(duration, TimeUnit.NANOSECONDS)); } } - @Override public Map oldest(int limit) { - throw new UnsupportedOperationException(); - } @Override public Map youngest(int limit) { - throw new UnsupportedOperationException(); + return cache.variableSnapshot(/* ascending */ true, limit, transformer); + } + @Override public Map oldest(int limit) { + return cache.variableSnapshot(/* ascending */ false, limit, transformer); } } @@ -3258,7 +3292,7 @@ Map sortedByWriteTime(int limit, boolean ascending) { Comparator> comparator = Comparator.comparingLong(Node::getWriteTime); Iterator> iterator = cache.data.values().stream().parallel().sorted( ascending ? comparator : comparator.reversed()).limit(limit).iterator(); - return cache.snapshot(() -> iterator, transformer, limit); + return cache.fixedSnapshot(() -> iterator, limit, transformer); } } } diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java index 5941191cfb..68402eb017 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java @@ -15,12 +15,18 @@ */ package com.github.benmanes.caffeine.cache; +import static com.github.benmanes.caffeine.cache.Caffeine.requireArgument; import static java.util.Objects.requireNonNull; import java.lang.ref.ReferenceQueue; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import javax.annotation.Nullable; import javax.annotation.concurrent.NotThreadSafe; @@ -47,19 +53,21 @@ final class TimerWheel { * http://www.cs.columbia.edu/~nahum/w6998/papers/ton97-timing-wheels.pdf */ - static final int[] BUCKETS = { 64, 64, 32, 4 }; + static final int[] BUCKETS = { 64, 64, 32, 4, 1 }; static final long[] SPANS = { ceilingPowerOfTwo(TimeUnit.SECONDS.toNanos(1)), // 1.07s ceilingPowerOfTwo(TimeUnit.MINUTES.toNanos(1)), // 1.14m ceilingPowerOfTwo(TimeUnit.HOURS.toNanos(1)), // 1.22h ceilingPowerOfTwo(TimeUnit.DAYS.toNanos(1)), // 1.63d BUCKETS[3] * ceilingPowerOfTwo(TimeUnit.DAYS.toNanos(1)), // 6.5d + BUCKETS[3] * ceilingPowerOfTwo(TimeUnit.DAYS.toNanos(1)), // 6.5d }; static final long[] SHIFT = { Long.SIZE - Long.numberOfLeadingZeros(SPANS[0] - 1), Long.SIZE - Long.numberOfLeadingZeros(SPANS[1] - 1), Long.SIZE - Long.numberOfLeadingZeros(SPANS[2] - 1), Long.SIZE - Long.numberOfLeadingZeros(SPANS[3] - 1), + Long.SIZE - Long.numberOfLeadingZeros(SPANS[4] - 1), }; final BoundedLocalCache cache; @@ -198,20 +206,15 @@ public void deschedule(Node node) { */ Node findBucket(long time) { long duration = time - nanos; - for (int i = 0; i < wheel.length; i++) { + int length = wheel.length - 1; + for (int i = 0; i < length; i++) { if (duration < SPANS[i + 1]) { int ticks = (int) (time >> SHIFT[i]); int index = ticks & (wheel[i].length - 1); return wheel[i][index]; } } - - // Add to the last timer bucket - int lastWheel = wheel.length - 1; - int buckets = wheel[lastWheel].length - 1; - int ticks = (int) (nanos >> SHIFT[lastWheel]) - 1; - int index = (ticks & buckets); - return wheel[lastWheel][index]; + return wheel[length][0]; } /** Adds the entry at the tail of the bucket's list. */ @@ -233,18 +236,65 @@ void unlink(Node node) { } } + /** + * Returns an unmodifiable snapshot map roughly ordered by the expiration time. The wheels are + * evaluated in order, but the timers that fall within the bucket's range are not sorted. Beware + * that obtaining the mappings is NOT a constant-time operation. + * + * @param ascending the direction + * @param limit the maximum number of entries + * @param transformer a function that unwraps the value + * @return an unmodifiable snapshot in the desired order + */ + public Map snapshot(boolean ascending, int limit, Function transformer) { + requireArgument(limit >= 0); + + Map map = new LinkedHashMap<>(Math.min(limit, cache.size())); + int startLevel = ascending ? 0 : wheel.length - 1; + for (int i = 0; i < wheel.length; i++) { + int indexOffset = ascending ? i : -i; + int index = startLevel + indexOffset; + + int ticks = (int) (nanos >> SHIFT[index]); + int bucketMask = (wheel[index].length - 1); + int startBucket = (ticks & bucketMask) + (ascending ? 1 : 0); + for (int j = 0; j < wheel[index].length; j++) { + int bucketOffset = ascending ? j : -j; + Node sentinel = wheel[index][(startBucket + bucketOffset) & bucketMask]; + + for (Node node = traverse(ascending, sentinel); + node != sentinel; node = traverse(ascending, node)) { + if (map.size() >= limit) { + break; + } + + K key = node.getKey(); + V value = transformer.apply(node.getValue()); + if ((key != null) && (value != null) && node.isAlive()) { + map.put(key, value); + } + } + } + } + return Collections.unmodifiableMap(map); + } + + static Node traverse(boolean ascending, Node node) { + return ascending ? node.getNextInVariableOrder() : node.getPreviousInVariableOrder(); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); for (int i = 0; i < wheel.length; i++) { - Map buckets = new TreeMap<>(); + Map> buckets = new TreeMap<>(); for (int j = 0; j < wheel[i].length; j++) { - int events = 0; + List events = new ArrayList<>(); for (Node node = wheel[i][j].getNextInVariableOrder(); node != wheel[i][j]; node = node.getNextInVariableOrder()) { - events++; + events.add(node.getKey()); } - if (events > 0) { + if (!events.isEmpty()) { buckets.put(j, events); } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java index f48e12f2c1..2b101e1128 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java @@ -15,6 +15,7 @@ */ package com.github.benmanes.caffeine.cache; +import static java.util.stream.Collectors.toList; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; @@ -28,21 +29,28 @@ import java.lang.ref.ReferenceQueue; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.IntStream; import javax.annotation.Nullable; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import com.google.common.collect.ImmutableList; + import it.unimi.dsi.fastutil.longs.LongArrayList; import it.unimi.dsi.fastutil.longs.LongList; @@ -51,9 +59,9 @@ */ @Test(singleThreaded = true) public final class TimerWheelTest { - TimerWheel timerWheel; - @Mock BoundedLocalCache cache; - @Captor ArgumentCaptor> captor; + TimerWheel timerWheel; + @Mock BoundedLocalCache cache; + @Captor ArgumentCaptor> captor; @BeforeMethod public void beforeMethod() { @@ -159,7 +167,7 @@ public void reschedule() { private void checkEmpty() { for (int i = 0; i < timerWheel.wheel.length; i++) { for (int j = 0; j < timerWheel.wheel[i].length; j++) { - Node sentinel = timerWheel.wheel[i][j]; + Node sentinel = timerWheel.wheel[i][j]; assertThat(sentinel.getNextInVariableOrder(), is(sentinel)); assertThat(sentinel.getPreviousInVariableOrder(), is(sentinel)); } @@ -238,9 +246,43 @@ public Iterator providesCascade() { return args.iterator(); } - private static final class Timer implements Node { - Node prev; - Node next; + @Test(dataProvider = "snapshot") + public void snapshot(boolean ascending, int limit, long nanos, Function transformer) { + int count = 21; + timerWheel.nanos = nanos; + int expected = Math.min(limit, count); + Comparator order = ascending ? Comparator.naturalOrder() : Comparator.reverseOrder(); + List times = IntStream.range(0, count).mapToLong(i -> { + long time = nanos + TimeUnit.SECONDS.toNanos(2 << i); + timerWheel.schedule(new Timer(time)); + return time; + }).boxed().sorted(order).collect(toList()).subList(0, expected); + + when(transformer.apply(anyLong())).thenAnswer(invocation -> invocation.getArgument(0)); + assertThat(snapshot(ascending, limit, transformer), is(times)); + verify(transformer, times(expected)).apply(anyLong()); + } + + private List snapshot(boolean ascending, int limit, Function transformer) { + return ImmutableList.copyOf(timerWheel.snapshot(ascending, limit, transformer).keySet()); + } + + @DataProvider(name="snapshot") + public Iterator providesSnaphot() { + List scenarios = new ArrayList<>(); + for (long nanos : new long[] {0L, System.nanoTime() }) { + for (int limit : new int[] { 10, 100 }) { + scenarios.addAll(Arrays.asList( + new Object[] { /* ascending */ true, limit, nanos, Mockito.mock(Function.class) }, + new Object[] { /* ascending */ false, limit, nanos, Mockito.mock(Function.class) })); + } + } + return scenarios.iterator(); + } + + private static final class Timer implements Node { + Node prev; + Node next; long variableTime; Timer(long accessTime) { @@ -253,26 +295,26 @@ private static final class Timer implements Node { @Override public void setVariableTime(long variableTime) { this.variableTime = variableTime; } - @Override public Node getPreviousInVariableOrder() { + @Override public Node getPreviousInVariableOrder() { return prev; } - @Override public void setPreviousInVariableOrder(@Nullable Node prev) { + @Override public void setPreviousInVariableOrder(@Nullable Node prev) { this.prev = prev; } - @Override public Node getNextInVariableOrder() { + @Override public Node getNextInVariableOrder() { return next; } - @Override public void setNextInVariableOrder(@Nullable Node next) { + @Override public void setNextInVariableOrder(@Nullable Node next) { this.next = next; } - @Override public Integer getKey() { return null; } + @Override public Long getKey() { return variableTime; } @Override public Object getKeyReference() { return null; } - @Override public Integer getValue() { return null; } + @Override public Long getValue() { return variableTime; } @Override public Object getValueReference() { return null; } - @Override public void setValue(Integer value, ReferenceQueue referenceQueue) {} + @Override public void setValue(Long value, ReferenceQueue referenceQueue) {} @Override public boolean containsValue(Object value) { return false; } - @Override public boolean isAlive() { return false; } + @Override public boolean isAlive() { return true; } @Override public boolean isRetired() { return false; } @Override public boolean isDead() { return false; } @Override public void retire() {} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java index 482fd64353..b229830c50 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; @@ -73,6 +74,8 @@ * @author ben.manes@gmail.com (Ben Manes) */ public final class CacheContext { + private static final long START_TIME = System.nanoTime() - TimeUnit.DAYS.toNanos(1); + final RemovalListener removalListener; final CacheWriter cacheWriter; final InitialCapacity initialCapacity; @@ -142,7 +145,7 @@ public CacheContext(InitialCapacity initialCapacity, Stats stats, CacheWeigher w this.isAsyncLoading = isAsyncLoading; this.writer = requireNonNull(writer); this.cacheWriter = writer.create(); - this.ticker = new SerializableFakeTicker(); + this.ticker = new SerializableFakeTicker().advance(START_TIME); this.implementation = requireNonNull(implementation); this.original = new LinkedHashMap<>(); this.initialSize = -1; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 807c4ffd92..e64427505c 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -60,7 +60,7 @@ ext { concurrentlinkedhashmap: '1.4.2', ehcache2: '2.10.4', ehcache3: '3.3.1', - elastic_search: '5.3.2', + elastic_search: '5.4.0', expiring_map: '0.5.8', jackrabbit: '1.6.1', jamm: '0.3.1',