diff --git a/.travis.yml b/.travis.yml index 2ea892ff5d..d6ee6ceb19 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,10 @@ language: java sudo: required +dist: xenial jdk: - - oraclejdk8 - - oraclejdk9 + - openjdk8 + - openjdk11 env: global: @@ -17,16 +18,7 @@ install: true script: - ./travis.sh $GROUP -matrix: - allow_failures: - - jdk: oraclejdk9 - fast_finish: true - addons: - apt: - packages: - - oracle-java8-installer - - oracle-java9-installer sonarcloud: organization: caffeine token: diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Feature.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Feature.java index 27c426dcb8..4c9953734b 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Feature.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Feature.java @@ -99,6 +99,11 @@ public static boolean usesExpirationTicker(Set features) { || features.contains(Feature.REFRESH_WRITE); } + public static boolean usesExpiration(Set features) { + return features.contains(Feature.EXPIRE_ACCESS) + || features.contains(Feature.EXPIRE_WRITE); + } + public static boolean usesMaximum(Set features) { return features.contains(Feature.MAXIMUM_SIZE) || features.contains(Feature.MAXIMUM_WEIGHT); diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/LocalCacheFactoryGenerator.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/LocalCacheFactoryGenerator.java index f4de51e1df..8ef47429c7 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/LocalCacheFactoryGenerator.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/LocalCacheFactoryGenerator.java @@ -50,6 +50,7 @@ import com.github.benmanes.caffeine.cache.local.AddMaximum; import com.github.benmanes.caffeine.cache.local.AddRefreshAfterWrite; import com.github.benmanes.caffeine.cache.local.AddRemovalListener; +import com.github.benmanes.caffeine.cache.local.AddPacer; import com.github.benmanes.caffeine.cache.local.AddStats; import com.github.benmanes.caffeine.cache.local.AddSubtype; import com.github.benmanes.caffeine.cache.local.AddWriteBuffer; @@ -83,7 +84,7 @@ public final class LocalCacheFactoryGenerator { new AddKeyValueStrength(), new AddRemovalListener(), new AddStats(), new AddExpirationTicker(), new AddMaximum(), new AddFastPath(), new AddDeques(), new AddExpireAfterAccess(), new AddExpireAfterWrite(), new AddRefreshAfterWrite(), - new AddWriteBuffer(), new Finalize()); + new AddWriteBuffer(), new AddPacer(), new Finalize()); final ZoneId timeZone = ZoneId.of("America/Los_Angeles"); final Path directory; diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java index d3e452c4e7..f81c791e88 100644 --- a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/Specifications.java @@ -88,6 +88,7 @@ public final class Specifications { public static final TypeName STATS_COUNTER = ClassName.get(PACKAGE_NAME + ".stats", "StatsCounter"); public static final TypeName TICKER = ClassName.get(PACKAGE_NAME, "Ticker"); + public static final TypeName PACER = ClassName.get(PACKAGE_NAME, "Pacer"); public static final TypeName ACCESS_ORDER_DEQUE = ParameterizedTypeName.get(ClassName.get(PACKAGE_NAME, "AccessOrderDeque"), NODE); diff --git a/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/local/AddPacer.java b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/local/AddPacer.java new file mode 100644 index 0000000000..ccca3b0819 --- /dev/null +++ b/caffeine/src/javaPoet/java/com/github/benmanes/caffeine/cache/local/AddPacer.java @@ -0,0 +1,48 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache.local; + +import static com.github.benmanes.caffeine.cache.Specifications.PACER; + +import javax.lang.model.element.Modifier; + +import com.github.benmanes.caffeine.cache.Feature; +import com.squareup.javapoet.FieldSpec; +import com.squareup.javapoet.MethodSpec; + +/** + * @author ben.manes@gmail.com (Ben Manes) + */ +public final class AddPacer extends LocalCacheRule { + + @Override + protected boolean applies() { + return !(Feature.usesExpiration(context.parentFeatures) + || !Feature.usesExpiration(context.generateFeatures)); + } + + @Override + protected void execute() { + context.constructor.addStatement("this.pacer = ($1L == $2L)\n? null\n: new Pacer($1L)", + "builder.getScheduler()", "Scheduler.disabledScheduler()"); + context.cache.addField(FieldSpec.builder(PACER, "pacer", Modifier.FINAL).build()); + context.cache.addMethod(MethodSpec.methodBuilder("pacer") + .addModifiers(context.publicFinalModifiers()) + .addStatement("return pacer") + .returns(PACER) + .build()); + } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java index 6967871f00..dbcc4b4547 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedLocalCache.java @@ -16,6 +16,7 @@ package com.github.benmanes.caffeine.cache; import static com.github.benmanes.caffeine.cache.Async.ASYNC_EXPIRY; +import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo; import static com.github.benmanes.caffeine.cache.Caffeine.requireArgument; import static com.github.benmanes.caffeine.cache.Caffeine.requireState; import static com.github.benmanes.caffeine.cache.LocalLoadingCache.newBulkMappingFunction; @@ -30,6 +31,7 @@ import java.io.Serializable; import java.lang.ref.Reference; import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; import java.util.AbstractCollection; import java.util.AbstractMap; import java.util.AbstractSet; @@ -234,7 +236,7 @@ protected BoundedLocalCache(Caffeine builder, writer = builder.getCacheWriter(); evictionLock = new ReentrantLock(); weigher = builder.getWeigher(isAsync); - drainBuffersTask = new PerformCleanupTask(); + drainBuffersTask = new PerformCleanupTask(this); nodeFactory = NodeFactory.newFactory(builder, isAsync); data = new ConcurrentHashMap<>(builder.getInitialCapacity()); readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess() @@ -247,11 +249,6 @@ protected BoundedLocalCache(Caffeine builder, } } - static int ceilingPowerOfTwo(int x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1 << -Integer.numberOfLeadingZeros(x - 1); - } - /* --------------- Shared --------------- */ /** Returns if the node's value is currently being computed, asynchronously. */ @@ -339,7 +336,7 @@ public void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause } }; try { - executor().execute(task); + executor.execute(task); } catch (Throwable t) { logger.log(Level.SEVERE, "Exception thrown when submitting removal listener", t); task.run(); @@ -370,6 +367,11 @@ protected ReferenceQueue valueReferenceQueue() { /* --------------- Expiration Support --------------- */ + /** Returns the {@link Pacer} used to schedule the maintenance task. */ + protected @Nullable Pacer pacer() { + return null; + } + /** Returns if the cache expires entries after a variable time threshold. */ protected boolean expiresVariable() { return false; @@ -780,6 +782,13 @@ void expireEntries() { expireAfterAccessEntries(now); expireAfterWriteEntries(now); expireVariableEntries(now); + + if (pacer() != null) { + long delay = getExpirationDelay(now); + if (delay != Long.MAX_VALUE) { + pacer().schedule(executor, drainBuffersTask, now, delay); + } + } } /** Expires entries in the access-order queue. */ @@ -833,6 +842,38 @@ void expireVariableEntries(long now) { } } + /** Returns the duration until the next item expires, or {@link Long.MAX_VALUE} if none. */ + @GuardedBy("evictionLock") + private long getExpirationDelay(long now) { + long delay = Long.MAX_VALUE; + if (expiresAfterAccess()) { + Node node = accessOrderWindowDeque().peekFirst(); + if (node != null) { + delay = Math.min(delay, now - node.getAccessTime() + expiresAfterAccessNanos()); + } + if (evicts()) { + node = accessOrderProbationDeque().peekFirst(); + if (node != null) { + delay = Math.min(delay, now - node.getAccessTime() + expiresAfterAccessNanos()); + } + node = accessOrderProtectedDeque().peekFirst(); + if (node != null) { + delay = Math.min(delay, now - node.getAccessTime() + expiresAfterAccessNanos()); + } + } + } + if (expiresAfterWrite()) { + Node node = writeOrderDeque().peekFirst(); + if (node != null) { + delay = Math.min(delay, now - node.getWriteTime() + expiresAfterWriteNanos()); + } + } + if (expiresVariable()) { + delay = Math.min(delay, timerWheel().getExpirationDelay()); + } + return delay; + } + /** Returns if the entry has expired. */ @SuppressWarnings("ShortCircuitBoolean") boolean hasExpired(Node node, long now) { @@ -1375,7 +1416,7 @@ void scheduleDrainBuffers() { return; } lazySetDrainStatus(PROCESSING_TO_IDLE); - executor().execute(drainBuffersTask); + executor.execute(drainBuffersTask); } catch (Throwable t) { logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t); maintenance(/* ignored */ null); @@ -3259,9 +3300,15 @@ public int characteristics() { } /** A reusable task that performs the maintenance work; used to avoid wrapping by ForkJoinPool. */ - final class PerformCleanupTask extends ForkJoinTask implements Runnable { + static final class PerformCleanupTask extends ForkJoinTask implements Runnable { private static final long serialVersionUID = 1L; + final WeakReference> reference; + + PerformCleanupTask(BoundedLocalCache cache) { + reference = new WeakReference>(cache); + } + @Override public boolean exec() { try { @@ -3276,7 +3323,10 @@ public boolean exec() { @Override public void run() { - performCleanUp(/* ignored */ null); + BoundedLocalCache cache = reference.get(); + if (cache != null) { + cache.performCleanUp(/* ignored */ null); + } } /** diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Caffeine.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Caffeine.java index 276815c6ff..272df94103 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Caffeine.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Caffeine.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.logging.Level; @@ -98,10 +99,11 @@ *

* If {@linkplain #expireAfter(Expiry) expireAfter}, * {@linkplain #expireAfterWrite expireAfterWrite}, or - * {@linkplain #expireAfterAccess expireAfterAccess} is requested entries may be evicted on each - * cache modification, on occasional cache accesses, or on calls to {@link Cache#cleanUp}. Expired - * entries may be counted by {@link Cache#estimatedSize()}, but will never be visible to read or - * write operations. + * {@linkplain #expireAfterAccess expireAfterAccess} is requested then entries may be evicted on + * each cache modification, on occasional cache accesses, or on calls to {@link Cache#cleanUp}. A + * {@linkplain #scheduler(Scheduler)} may be specified to provide prompt removal of expired entries + * rather than waiting until activity triggers the periodic maintenance. Expired entries may be + * counted by {@link Cache#estimatedSize()}, but will never be visible to read or write operations. *

* If {@linkplain #weakKeys weakKeys}, {@linkplain #weakValues weakValues}, or * {@linkplain #softValues softValues} are requested, it is possible for a key or value present in @@ -159,6 +161,7 @@ enum Strength { WEAK, SOFT } @Nullable CacheWriter writer; @Nullable Weigher weigher; @Nullable Expiry expiry; + @Nullable Scheduler scheduler; @Nullable Executor executor; @Nullable Ticker ticker; @@ -195,6 +198,18 @@ static void requireState(boolean expression, String template, @Nullable Object.. } } + /** Returns the smallest power of two greater than or equal to {@code x}. */ + static int ceilingPowerOfTwo(int x) { + // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. + return 1 << -Integer.numberOfLeadingZeros(x - 1); + } + + /** Returns the smallest power of two greater than or equal to {@code x}. */ + static long ceilingPowerOfTwo(long x) { + // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. + return 1L << -Long.numberOfLeadingZeros(x - 1); + } + /** * Constructs a new {@code Caffeine} instance with default settings, including strong keys, strong * values, and no automatic eviction of any kind. @@ -289,6 +304,43 @@ Executor getExecutor() { return (executor == null) ? ForkJoinPool.commonPool() : executor; } + /** + * Specifies the scheduler to use when scheduling routine maintenance based on an expiration + * event. This augments the periodic maintenance that occurs during normal cache operations to + * allow for the promptly removal of expired entries regardless of whether any cache activity is + * occurring at that time. By default, {@link Scheduler#disabledScheduler()} is used. + *

+ * The scheduling between expiration events is paced to exploit batching and to minimize + * executions in short succession. This minimum difference between the scheduled executions is + * implementation-specific, currently at ~1 second (2^30 ns). In addition, the provided scheduler + * may not offer real-time guarantees (including {@link ThreadPoolExecutor}). The scheduling is + * best-effort and does not make any hard guarantees of when an expired entry will be removed. + *

+ * Note for Java 9 and later: consider using {@link Scheduler#systemScheduler()} to + * leverage the dedicated, system-wide scheduling thread. + * + * @param scheduler the scheduler that submits a task to the {@link #executor(Executor)} after a + * given delay + * @return this {@code Caffeine} instance (for chaining) + * @throws NullPointerException if the specified scheduler is null + */ + @NonNull + public Caffeine scheduler(@NonNull Scheduler scheduler) { + requireState(this.scheduler == null, "scheduler was already set to %s", this.scheduler); + this.scheduler = requireNonNull(scheduler); + return this; + } + + @NonNull + Scheduler getScheduler() { + if ((scheduler == null) || (scheduler == Scheduler.disabledScheduler())) { + return Scheduler.disabledScheduler(); + } else if (scheduler == Scheduler.systemScheduler()) { + return scheduler; + } + return Scheduler.guardedScheduler(scheduler); + } + /** * Specifies the maximum number of entries the cache may contain. Note that the cache may evict * an entry before this limit is exceeded or temporarily exceed the threshold while evicting. diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java index af3d37caa4..1bca9eb773 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/LocalCache.java @@ -50,19 +50,16 @@ interface LocalCache extends ConcurrentMap { void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause); /** Returns the {@link Executor} used by this cache. */ - @NonNull - Executor executor(); + @NonNull Executor executor(); /** Returns whether the cache captures the write time of the entry. */ boolean hasWriteTime(); /** Returns the {@link Ticker} used by this cache for expiration. */ - @NonNull - Ticker expirationTicker(); + @NonNull Ticker expirationTicker(); /** Returns the {@link Ticker} used by this cache for statistics. */ - @NonNull - Ticker statsTicker(); + @NonNull Ticker statsTicker(); /** See {@link Cache#estimatedSize()}. */ long estimatedSize(); diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java index 7ef58ec026..8727903be9 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/MpscGrowableArrayQueue.java @@ -14,7 +14,7 @@ package com.github.benmanes.caffeine.cache; import static com.github.benmanes.caffeine.base.UnsafeAccess.UNSAFE; -import static com.github.benmanes.caffeine.cache.BoundedLocalCache.ceilingPowerOfTwo; +import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo; import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ARRAY_BASE; import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.lvElement; diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java new file mode 100644 index 0000000000..7bededbcc0 --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Pacer.java @@ -0,0 +1,84 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo; +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A pacing scheduler that prevents executions from happening too frequently. Only one task may be + * scheduled at any given time, the earliest pending task takes precedence, and the delay may be + * increased if it is less than a tolerance threshold. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +final class Pacer { + static final long TOLERANCE = ceilingPowerOfTwo(TimeUnit.SECONDS.toNanos(1)); // 1.07s + + final Scheduler scheduler; + + long nextFireTime; + @Nullable Future future; + + Pacer(Scheduler scheduler) { + this.scheduler = requireNonNull(scheduler); + } + + /** Schedules the task, pacing the execution if occurring too often. */ + public void schedule(Executor executor, Runnable command, long now, long delay) { + long scheduleAt = (now + delay); + + if (future == null) { + // short-circuit an immediate scheduler causing an infinite loop during initialization + if (nextFireTime != 0) { + return; + } + } else if ((nextFireTime - now) > 0) { + // Determine whether to reschedule + if (maySkip(scheduleAt)) { + return; + } + future.cancel(/* mayInterruptIfRunning */ false); + } + long actualDelay = calculateSchedule(now, delay, scheduleAt); + future = scheduler.schedule(executor, command, actualDelay, TimeUnit.NANOSECONDS); + } + + /** + * Returns if the current fire time is sooner, or if it is later and within the tolerance limit. + */ + boolean maySkip(long scheduleAt) { + long delta = (scheduleAt - nextFireTime); + return (delta >= 0) || (-delta <= TOLERANCE); + } + + /** Returns the delay and sets the next fire time. */ + long calculateSchedule(long now, long delay, long scheduleAt) { + if (delay <= TOLERANCE) { + // Use a minimum delay if close to now + nextFireTime = (now + TOLERANCE); + return TOLERANCE; + } + nextFireTime = scheduleAt; + return delay; + } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Scheduler.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Scheduler.java new file mode 100644 index 0000000000..e64ca177bc --- /dev/null +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/Scheduler.java @@ -0,0 +1,220 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static java.util.Objects.requireNonNull; + +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A scheduler that submit a task to an executor after a given delay. + * + * @author ben.manes@gmail.com (Ben Manes) + */ +@FunctionalInterface +public interface Scheduler { + + /** + * Returns a future that will submit the task to the given executor after the given delay. + * + * @param executor the executor to run the task + * @param command the runnable task to schedule + * @param delay how long to delay, in units of {@code unit} + * @param unit a {@code TimeUnit} determining how to interpret the {@code delay} parameter + * @return a scheduled future representing pending completion of the task + */ + @NonNull Future schedule(@NonNull Executor executor, + @NonNull Runnable command, long delay, @NonNull TimeUnit unit); + + /** + * Returns a scheduler that always returns a successfully completed future. + * + * @return a scheduler that always returns a successfully completed future + */ + static @NonNull Scheduler disabledScheduler() { + return DisabledScheduler.INSTANCE; + } + + /** + * Returns a scheduler that uses the system-wide scheduling thread if available, or else returns + * {@link #disabledScheduler()} if not present. This scheduler is provided in Java 9 or above + * by using {@code CompletableFuture} {@code delayedExecutor}. + * + * @return a scheduler that uses the system-wide scheduling thread if available, or else a + * disabled scheduler + */ + static @NonNull Scheduler systemScheduler() { + return SystemScheduler.isPresent() ? SystemScheduler.INSTANCE : disabledScheduler() ; + } + + /** + * Returns a scheduler that delegates to the a {@link ScheduledExecutorService}. + * + * @param scheduledExecutorService the executor to schedule on + * @return a scheduler that delegates to the a {@link ScheduledExecutorService} + */ + static @NonNull Scheduler forScheduledExecutorService( + @NonNull ScheduledExecutorService scheduledExecutorService) { + return new ExecutorServiceScheduler(scheduledExecutorService); + } + + /** + * Returns a scheduler that suppresses and logs any exception thrown by the delegate + * {@code scheduler}. + * + * @param scheduler the scheduler to delegate to + * @return an scheduler that suppresses and logs any exception thrown by the delegate + */ + static @NonNull Scheduler guardedScheduler(@NonNull Scheduler scheduler) { + return new GuardedScheduler(scheduler); + } +} + +enum SystemScheduler implements Scheduler { + INSTANCE; + + static final @Nullable Method delayedExecutor = getDelayedExecutorMethod(); + + @Override + @SuppressWarnings("NullAway") + public Future schedule(Executor executor, Runnable command, long delay, TimeUnit unit) { + requireNonNull(executor); + requireNonNull(command); + requireNonNull(unit); + + try { + Executor scheduler = (Executor) delayedExecutor.invoke( + CompletableFuture.class, delay, unit, executor); + return CompletableFuture.runAsync(command, scheduler); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + static @Nullable Method getDelayedExecutorMethod() { + try { + return CompletableFuture.class.getMethod( + "delayedExecutor", long.class, TimeUnit.class, Executor.class); + } catch (NoSuchMethodException | SecurityException e) { + return null; + } + } + + static boolean isPresent() { + return (delayedExecutor != null); + } +} + +final class ExecutorServiceScheduler implements Scheduler, Serializable { + static final Logger logger = Logger.getLogger(ExecutorServiceScheduler.class.getName()); + static final long serialVersionUID = 1; + + final ScheduledExecutorService scheduledExecutorService; + + ExecutorServiceScheduler(ScheduledExecutorService scheduledExecutorService) { + this.scheduledExecutorService = requireNonNull(scheduledExecutorService); + } + + @Override + public Future schedule(Executor executor, Runnable command, long delay, TimeUnit unit) { + requireNonNull(executor); + requireNonNull(command); + requireNonNull(unit); + + if (scheduledExecutorService.isShutdown()) { + return DisabledFuture.INSTANCE; + } + return scheduledExecutorService.schedule(() -> { + try { + executor.execute(command); + } catch (Throwable t) { + logger.log(Level.WARNING, "Exception thrown when submitting scheduled task", t); + throw t; + } + }, delay, unit); + } +} + +final class GuardedScheduler implements Scheduler, Serializable { + static final Logger logger = Logger.getLogger(GuardedScheduler.class.getName()); + static final long serialVersionUID = 1; + + final Scheduler delegate; + + GuardedScheduler(Scheduler delegate) { + this.delegate = requireNonNull(delegate); + } + + @Override + public @NonNull Future schedule(@NonNull Executor executor, + @NonNull Runnable command, long delay, @NonNull TimeUnit unit) { + try { + Future future = delegate.schedule(executor, command, delay, unit); + return (future == null) ? DisabledFuture.INSTANCE : future; + } catch (Throwable t) { + logger.log(Level.WARNING, "Exception thrown by scheduler; discarded task", t); + return DisabledFuture.INSTANCE; + } + } +} + +enum DisabledScheduler implements Scheduler { + INSTANCE; + + @Override + public Future schedule(Executor executor, Runnable command, long delay, TimeUnit unit) { + requireNonNull(executor); + requireNonNull(command); + requireNonNull(unit); + return DisabledFuture.INSTANCE; + } +} + +enum DisabledFuture implements Future { + INSTANCE; + + @Override public boolean isDone() { + return true; + } + @Override public boolean isCancelled() { + return false; + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + @Override public Void get() throws InterruptedException, ExecutionException { + return null; + } + @Override public Void get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + requireNonNull(unit); + return null; + } +} diff --git a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java index 7b78d23868..d859e77d84 100644 --- a/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java +++ b/caffeine/src/main/java/com/github/benmanes/caffeine/cache/TimerWheel.java @@ -15,6 +15,7 @@ */ package com.github.benmanes.caffeine.cache; +import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo; import static com.github.benmanes.caffeine.cache.Caffeine.requireArgument; import static java.util.Objects.requireNonNull; @@ -97,8 +98,8 @@ public void advance(long currentTimeNanos) { try { nanos = currentTimeNanos; for (int i = 0; i < SHIFT.length; i++) { - long previousTicks = (previousTimeNanos >> SHIFT[i]); - long currentTicks = (currentTimeNanos >> SHIFT[i]); + long previousTicks = (previousTimeNanos >>> SHIFT[i]); + long currentTicks = (currentTimeNanos >>> SHIFT[i]); if ((currentTicks - previousTicks) <= 0L) { break; } @@ -205,8 +206,8 @@ Node findBucket(long time) { int length = wheel.length - 1; for (int i = 0; i < length; i++) { if (duration < SPANS[i + 1]) { - int ticks = (int) (time >> SHIFT[i]); - int index = ticks & (wheel[i].length - 1); + long ticks = (time >>> SHIFT[i]); + int index = (int) (ticks & (wheel[i].length - 1)); return wheel[i][index]; } } @@ -232,6 +233,28 @@ void unlink(Node node) { } } + /** Returns the duration until the next bucket expires, or {@link Long.MAX_VALUE} if none. */ + public long getExpirationDelay() { + for (int i = 0; i < SHIFT.length; i++) { + Node[] timerWheel = wheel[i]; + long ticks = (nanos >>> SHIFT[i]); + + long spanMask = SPANS[i] - 1; + int start = (int) (ticks & spanMask); + int end = start + timerWheel.length; + int mask = timerWheel.length - 1; + for (int j = start; j < end; j++) { + Node sentinel = timerWheel[(j & mask)]; + Node next = sentinel.getNextInVariableOrder(); + if (sentinel != next) { + long delay = ((j - start) * SPANS[i]) - (nanos & spanMask); + return (delay > 0) ? delay : (next.getVariableTime() - nanos); + } + } + } + return Long.MAX_VALUE; + } + /** * Returns an unmodifiable snapshot map roughly ordered by the expiration time. The wheels are * evaluated in order, but the timers that fall within the bucket's range are not sorted. Beware @@ -251,7 +274,7 @@ public Map snapshot(boolean ascending, int limit, @NonNull Function int indexOffset = ascending ? i : -i; int index = startLevel + indexOffset; - int ticks = (int) (nanos >> SHIFT[index]); + int ticks = (int) (nanos >>> SHIFT[index]); int bucketMask = (wheel[index].length - 1); int startBucket = (ticks & bucketMask) + (ascending ? 1 : 0); for (int j = 0; j < wheel[index].length; j++) { @@ -335,9 +358,4 @@ static final class Sentinel extends Node { @Override public void retire() {} @Override public void die() {} } - - static long ceilingPowerOfTwo(long x) { - // From Hacker's Delight, Chapter 3, Harry S. Warren Jr. - return 1L << -Long.numberOfLeadingZeros(x - 1); - } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java index 2bdb25e848..0e12735f50 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/BoundedLocalCacheTest.java @@ -39,7 +39,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -48,6 +47,7 @@ import org.testng.annotations.Listeners; import org.testng.annotations.Test; +import com.github.benmanes.caffeine.cache.BoundedLocalCache.PerformCleanupTask; import com.github.benmanes.caffeine.cache.Policy.Eviction; import com.github.benmanes.caffeine.cache.References.WeakKeyReference; import com.github.benmanes.caffeine.cache.testing.CacheContext; @@ -70,7 +70,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.common.testing.GcFinalization; /** * The test cases for the implementation details of {@link BoundedLocalCache}. @@ -80,13 +80,23 @@ @Listeners(CacheValidationListener.class) @Test(dataProviderClass = CacheProvider.class) public final class BoundedLocalCacheTest { - final Executor executor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).build()); static BoundedLocalCache asBoundedLocalCache(Cache cache) { return (BoundedLocalCache) cache.asMap(); } + @Test + @SuppressWarnings("UnusedVariable") + public void cleanupTask_allowGc() { + BoundedLocalCache cache = new BoundedLocalCache( + Caffeine.newBuilder(), /* loader */ null, /* async */ false) {}; + PerformCleanupTask task = cache.drainBuffersTask; + cache = null; + + GcFinalization.awaitClear(task.reference); + task.run(); + } + @Test public void scheduleAfterWrite() { BoundedLocalCache cache = new BoundedLocalCache( @@ -596,7 +606,7 @@ void checkDrainBlocks(BoundedLocalCache localCache, Runnable t ReentrantLock lock = localCache.evictionLock; lock.lock(); try { - executor.execute(() -> { + ConcurrentTestHarness.execute(() -> { localCache.lazySetDrainStatus(REQUIRED); task.run(); done.set(true); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CaffeineTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CaffeineTest.java index fbe55db3bf..1f42638ce9 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CaffeineTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/CaffeineTest.java @@ -564,6 +564,27 @@ public void softValues() { Caffeine.newBuilder().softValues().build(); } + /* --------------- scheduler --------------- */ + + @Test(expectedExceptions = NullPointerException.class) + public void scheduler_null() { + Caffeine.newBuilder().scheduler(null); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void scheduler_twice() { + Caffeine.newBuilder().scheduler(Scheduler.disabledScheduler()) + .scheduler(Scheduler.disabledScheduler()); + } + + @Test + public void scheduler() { + Scheduler scheduler = (executor, task, delay, unit) -> DisabledFuture.INSTANCE; + Caffeine builder = Caffeine.newBuilder().scheduler(scheduler); + assertThat(((GuardedScheduler) builder.getScheduler()).delegate, is(scheduler)); + builder.build(); + } + /* --------------- executor --------------- */ @Test(expectedExceptions = NullPointerException.class) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java index 41fd7e9842..cdf9213e99 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/ExpirationTest.java @@ -15,6 +15,7 @@ */ package com.github.benmanes.caffeine.cache; +import static com.github.benmanes.caffeine.cache.Pacer.TOLERANCE; import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.AFTER_ACCESS; import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.AFTER_WRITE; import static com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration.VARIABLE; @@ -22,10 +23,22 @@ import static com.github.benmanes.caffeine.cache.testing.HasRemovalNotifications.hasRemovalNotifications; import static com.github.benmanes.caffeine.testing.IsFutureValue.futureOf; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anEmptyMap; +import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; import java.util.List; import java.util.Map; @@ -34,6 +47,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.mockito.ArgumentCaptor; import org.testng.annotations.Listeners; import org.testng.annotations.Test; @@ -41,6 +55,7 @@ import com.github.benmanes.caffeine.cache.testing.CacheProvider; import com.github.benmanes.caffeine.cache.testing.CacheSpec; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExpiry; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheScheduler; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.ExecutorFailure; @@ -91,6 +106,55 @@ public void expire_zero(Cache cache, CacheContext context) { } } + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY, + mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE }, + expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS }, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, compute = Compute.SYNC, + scheduler = CacheScheduler.MOCK) + public void schedule(Cache cache, CacheContext context) { + ArgumentCaptor delay = ArgumentCaptor.forClass(long.class); + ArgumentCaptor task = ArgumentCaptor.forClass(Runnable.class); + doReturn(DisabledFuture.INSTANCE).when(context.scheduler()).schedule( + eq(context.executor()), task.capture(), delay.capture(), eq(TimeUnit.NANOSECONDS)); + + cache.put(context.absentKey(), context.absentValue()); + + long minError = TimeUnit.MINUTES.toNanos(1) - TOLERANCE; + long maxError = TimeUnit.MINUTES.toNanos(1) + TOLERANCE; + assertThat(delay.getValue(), is(both(greaterThan(minError)).and(lessThan(maxError)))); + + context.ticker().advance(delay.getValue()); + task.getValue().run(); + + if (context.expiresVariably()) { + // scheduled a timerWheel cascade, run next schedule + assertThat(delay.getAllValues(), hasSize(2)); + context.ticker().advance(delay.getValue()); + task.getValue().run(); + } + + assertThat(cache.asMap(), is(anEmptyMap())); + } + + @Test(dataProvider = "caches") + @CacheSpec(implementation = Implementation.Caffeine, population = Population.EMPTY, + mustExpireWithAnyOf = { AFTER_ACCESS, AFTER_WRITE, VARIABLE }, + expiry = { CacheExpiry.DISABLED, CacheExpiry.CREATE, CacheExpiry.WRITE, CacheExpiry.ACCESS }, + expireAfterAccess = {Expire.DISABLED, Expire.ONE_MINUTE}, expiryTime = Expire.ONE_MINUTE, + expireAfterWrite = {Expire.DISABLED, Expire.ONE_MINUTE}, compute = Compute.SYNC, + scheduler = CacheScheduler.MOCK) + public void schedule_immediate(Cache cache, CacheContext context) { + doAnswer(invocation -> { + ((Runnable) invocation.getArgument(1)).run(); + return DisabledFuture.INSTANCE; + }).when(context.scheduler()).schedule(any(), any(), anyLong(), any()); + + cache.put(context.absentKey(), context.absentValue()); + verify(context.scheduler(), atMostOnce()).schedule(any(), any(), anyLong(), any()); + } + /* --------------- Cache --------------- */ @Test(dataProvider = "caches", expectedExceptions = DeleteException.class) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java new file mode 100644 index 0000000000..5bc63ec43e --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/PacerTest.java @@ -0,0 +1,127 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; + +import java.util.Random; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.primitives.Ints; + +/** + * @author ben.manes@gmail.com (Ben Manes) + */ +@Test(singleThreaded = true) +public final class PacerTest { + private static final long ONE_MINUTE_IN_NANOS = TimeUnit.MINUTES.toNanos(1); + private static final Random random = new Random(); + private static final long NOW = random.nextLong(); + + @Mock Scheduler scheduler; + @Mock Executor executor; + @Mock Runnable command; + @Mock Future future; + + Pacer pacer; + + @BeforeMethod + public void beforeMethod() { + MockitoAnnotations.initMocks(this); + pacer = new Pacer(scheduler); + } + + @Test + public void scheduledAfterNextFireTime_skip() { + pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; + pacer.future = future; + + long expectedNextFireTime = pacer.nextFireTime; + pacer.schedule(executor, command, NOW, ONE_MINUTE_IN_NANOS); + + assertThat(pacer.future, is(future)); + assertThat(pacer.nextFireTime, is(expectedNextFireTime)); + verifyZeroInteractions(scheduler, executor, command, future); + } + + @Test + public void scheduledBeforeNextFireTime_skip() { + pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; + pacer.future = future; + + long expectedNextFireTime = pacer.nextFireTime; + long delay = ONE_MINUTE_IN_NANOS + - Math.max(1, random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE))); + pacer.schedule(executor, command, NOW, delay); + + assertThat(pacer.future, is(future)); + assertThat(pacer.nextFireTime, is(expectedNextFireTime)); + verifyZeroInteractions(scheduler, executor, command, future); + } + + @Test + public void scheduledBeforeNextFireTime_minimumDelay() { + pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; + pacer.future = future; + + long delay = random.nextInt(Ints.saturatedCast(Pacer.TOLERANCE)); + doReturn(DisabledFuture.INSTANCE) + .when(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS); + pacer.schedule(executor, command, NOW, delay); + + assertThat(pacer.future, is(DisabledFuture.INSTANCE)); + assertThat(pacer.nextFireTime, is(NOW + Pacer.TOLERANCE)); + + verify(future).cancel(anyBoolean()); + verify(scheduler).schedule(executor, command, Pacer.TOLERANCE, TimeUnit.NANOSECONDS); + + verifyZeroInteractions(executor, command); + verifyNoMoreInteractions(scheduler, future); + } + + @Test + public void scheduledBeforeNextFireTime_customDelay() { + pacer.nextFireTime = NOW + ONE_MINUTE_IN_NANOS; + pacer.future = future; + + long delay = (Pacer.TOLERANCE + Math.max(1, random.nextInt())); + doReturn(DisabledFuture.INSTANCE) + .when(scheduler).schedule(executor, command, delay, TimeUnit.NANOSECONDS); + pacer.schedule(executor, command, NOW, delay); + + assertThat(pacer.future, is(DisabledFuture.INSTANCE)); + assertThat(pacer.nextFireTime, is(NOW + delay)); + + verify(future).cancel(anyBoolean()); + verify(scheduler).schedule(executor, command, delay, TimeUnit.NANOSECONDS); + + verifyZeroInteractions(executor, command); + verifyNoMoreInteractions(scheduler, future); + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RandomSeedEnforcer.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RandomSeedEnforcer.java index ca4861fe11..2bba16a9db 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RandomSeedEnforcer.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/RandomSeedEnforcer.java @@ -25,18 +25,18 @@ public final class RandomSeedEnforcer { static final long PROBE = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomProbe"); static final long SEED = UnsafeAccess.objectFieldOffset(Thread.class, "threadLocalRandomSeed"); + static final int RANDOM_PROBE = 0x9e3779b9; static final int RANDOM_SEED = 1033096058; private RandomSeedEnforcer() {} - /** Force the random seed to a predictable value. */ - public static void ensureRandomSeed(Cache cache) { - resetThreadLocalRandom(); + /** Forces the eviction jitter to be predictable. */ + public static void resetThreadLocalRandom() { + setThreadLocalRandom(RANDOM_PROBE, RANDOM_SEED); } - /** Forces the eviction jitter to be predictable. */ - private static void resetThreadLocalRandom() { - UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), PROBE, 0x9e3779b9); - UnsafeAccess.UNSAFE.putLong(Thread.currentThread(), SEED, RANDOM_SEED); + public static void setThreadLocalRandom(int probe, int seed) { + UnsafeAccess.UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + UnsafeAccess.UNSAFE.putLong(Thread.currentThread(), SEED, seed); } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/SchedulerTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/SchedulerTest.java new file mode 100644 index 0000000000..d0eeb00afe --- /dev/null +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/SchedulerTest.java @@ -0,0 +1,198 @@ +/* + * Copyright 2019 Ben Manes. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.cache; + +import static com.github.benmanes.caffeine.testing.Awaits.await; +import static com.github.benmanes.caffeine.testing.ConcurrentTestHarness.scheduledExecutor; +import static com.google.common.util.concurrent.testing.TestingExecutors.sameThreadScheduledExecutor; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.LogManager; + +import org.apache.commons.lang3.SystemUtils; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.github.benmanes.caffeine.testing.ConcurrentTestHarness; +import com.google.common.collect.ImmutableSet; +import com.google.common.testing.NullPointerTester; + +/** + * @author ben.manes@gmail.com (Ben Manes) + */ +@SuppressWarnings("FutureReturnValueIgnored") +public final class SchedulerTest { + static { + // disable logging warnings caused by exceptions + LogManager.getLogManager().reset(); + } + + private final NullPointerTester npeTester = new NullPointerTester(); + + @Test + public void hasSystemScheduler() { + Scheduler scheduler = SystemUtils.IS_JAVA_1_8 + ? Scheduler.disabledScheduler() + : SystemScheduler.INSTANCE; + assertThat(Scheduler.systemScheduler(), is(scheduler)); + } + + @Test(dataProvider = "schedulers") + public void scheduler_null(Scheduler scheduler) { + npeTester.testAllPublicInstanceMethods(scheduler); + } + + @Test(dataProvider = "runnableSchedulers") + public void scheduler_exception(Scheduler scheduler) { + AtomicBoolean executed = new AtomicBoolean(); + Executor executor = task -> { + executed.set(true); + throw new IllegalStateException(); + }; + scheduler.schedule(executor, () -> {}, 1L, TimeUnit.NANOSECONDS); + await().untilTrue(executed); + } + + @Test(dataProvider = "runnableSchedulers") + public void scheduler(Scheduler scheduler) { + AtomicBoolean executed = new AtomicBoolean(); + Runnable task = () -> executed.set(true); + scheduler.schedule(ConcurrentTestHarness.executor, task, 1L, TimeUnit.NANOSECONDS); + await().untilTrue(executed); + } + + /* --------------- disabled --------------- */ + + @Test + public void disabledScheduler() { + Future future = Scheduler.disabledScheduler() + .schedule(Runnable::run, () -> {}, 1, TimeUnit.MINUTES); + assertThat(future, is(DisabledFuture.INSTANCE)); + } + + @Test + public void disabledFuture_null() { + npeTester.testAllPublicInstanceMethods(DisabledFuture.INSTANCE); + } + + /* --------------- guarded --------------- */ + + @Test(expectedExceptions = NullPointerException.class) + public void guardedScheduler_null() { + Scheduler.guardedScheduler(null); + } + + @Test + public void guardedScheduler_nullFuture() { + ScheduledExecutorService scheduledExecutor = Mockito.mock(ScheduledExecutorService.class); + Scheduler scheduler = Scheduler.forScheduledExecutorService(scheduledExecutor); + Executor executor = Mockito.mock(Executor.class); + Runnable command = () -> {}; + + Future future = Scheduler.guardedScheduler(scheduler) + .schedule(executor, command, 1L, TimeUnit.MINUTES); + verify(scheduledExecutor).schedule(any(Runnable.class), eq(1L), eq(TimeUnit.MINUTES)); + assertThat(future, is(DisabledFuture.INSTANCE)); + } + + @Test + public void guardedScheduler() { + Future future = Scheduler.guardedScheduler(Scheduler.disabledScheduler()) + .schedule(Runnable::run, () -> {}, 1, TimeUnit.MINUTES); + assertThat(future, is(DisabledFuture.INSTANCE)); + } + + /* --------------- ScheduledExecutorService --------------- */ + + @Test(expectedExceptions = NullPointerException.class) + public void scheduledExecutorService_null() { + Scheduler.forScheduledExecutorService(null); + } + + @Test + public void scheduledExecutorService_schedule() { + ScheduledExecutorService scheduledExecutor = Mockito.mock(ScheduledExecutorService.class); + ArgumentCaptor task = ArgumentCaptor.forClass(Runnable.class); + Executor executor = Mockito.mock(Executor.class); + Runnable command = () -> {}; + + Scheduler scheduler = Scheduler.forScheduledExecutorService(scheduledExecutor); + Future future = scheduler.schedule(executor, command, 1L, TimeUnit.MINUTES); + assertThat(future, is(not(DisabledFuture.INSTANCE))); + + verify(scheduledExecutor).isShutdown(); + verify(scheduledExecutor).schedule(task.capture(), eq(1L), eq(TimeUnit.MINUTES)); + verifyNoMoreInteractions(scheduledExecutor); + + task.getValue().run(); + verify(executor).execute(command); + verifyNoMoreInteractions(executor); + } + + @Test + public void scheduledExecutorService_shutdown() { + ScheduledExecutorService scheduledExecutor = Mockito.mock(ScheduledExecutorService.class); + Executor executor = Mockito.mock(Executor.class); + + when(scheduledExecutor.isShutdown()).thenReturn(true); + Scheduler scheduler = Scheduler.forScheduledExecutorService(scheduledExecutor); + Future future = scheduler.schedule(executor, () -> {}, 1L, TimeUnit.MINUTES); + assertThat(future, is(DisabledFuture.INSTANCE)); + + verify(scheduledExecutor).isShutdown(); + verifyNoMoreInteractions(scheduledExecutor); + verifyZeroInteractions(executor); + } + + /* --------------- providers --------------- */ + + @DataProvider(name = "schedulers") + public Iterator providesSchedulers() { + ImmutableSet schedulers = ImmutableSet.of( + Scheduler.forScheduledExecutorService(sameThreadScheduledExecutor()), + Scheduler.forScheduledExecutorService(scheduledExecutor), + Scheduler.disabledScheduler(), + Scheduler.systemScheduler()); + return schedulers.iterator(); + } + + @DataProvider(name = "runnableSchedulers") + public Iterator providesRunnableSchedulers() { + ImmutableSet schedulers = ImmutableSet.of( + Scheduler.forScheduledExecutorService(sameThreadScheduledExecutor()), + Scheduler.forScheduledExecutorService(scheduledExecutor), + Scheduler.systemScheduler()); + return schedulers.stream() + .filter(scheduler -> scheduler != Scheduler.disabledScheduler()) + .iterator(); + } +} diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java index 0a518feb8c..448ab9f599 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/TimerWheelTest.java @@ -34,6 +34,7 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; +import java.util.Random; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -52,35 +53,40 @@ import com.google.common.collect.ImmutableList; import it.unimi.dsi.fastutil.longs.LongArrayList; -import it.unimi.dsi.fastutil.longs.LongList; /** * @author ben.manes@gmail.com (Ben Manes) */ @Test(singleThreaded = true) public final class TimerWheelTest { - TimerWheel timerWheel; - @Mock BoundedLocalCache cache; + private static final Random random = new Random(); + private static final long NOW = random.nextLong(); + @Captor ArgumentCaptor> captor; + @Mock BoundedLocalCache cache; + TimerWheel timerWheel; @BeforeMethod public void beforeMethod() { MockitoAnnotations.initMocks(this); timerWheel = new TimerWheel<>(cache); + + RandomSeedEnforcer.setThreadLocalRandom(random.nextInt(), random.nextInt()); } @Test(dataProvider = "schedule") public void schedule(long nanos, int expired) { when(cache.evictEntry(captor.capture(), any(), anyLong())).thenReturn(true); + timerWheel.nanos = NOW; for (int timeout : new int[] { 25, 90, 240 }) { - timerWheel.schedule(new Timer(TimeUnit.SECONDS.toNanos(timeout))); + timerWheel.schedule(new Timer(NOW + TimeUnit.SECONDS.toNanos(timeout))); } - timerWheel.advance(nanos); + timerWheel.advance(NOW + nanos); verify(cache, times(expired)).evictEntry(any(), any(), anyLong()); for (Node node : captor.getAllValues()) { - assertThat(node.getVariableTime(), is(lessThan(nanos))); + assertThat(node.getVariableTime(), is(lessThan(NOW + nanos))); } } @@ -114,6 +120,37 @@ public void schedule_fuzzy(long clock, long nanos, long[] times) { checkTimerWheel(nanos); } + @Test(dataProvider = "fuzzySchedule") + public void getExpirationDelay(long clock, long nanos, long[] times) { + when(cache.evictEntry(any(), any(), anyLong())).thenReturn(true); + timerWheel.nanos = clock; + for (long timeout : times) { + timerWheel.schedule(new Timer(timeout)); + } + timerWheel.advance(nanos); + + int minSpan = Integer.MAX_VALUE; + long minDelay = Long.MAX_VALUE; + for (int i = 0; i < timerWheel.wheel.length; i++) { + for (int j = 0; j < timerWheel.wheel[i].length; j++) { + LongArrayList timers = getTimers(timerWheel.wheel[i][j]); + if (!timers.isEmpty()) { + minSpan = Math.min(minSpan, i); + for (int k = 0; k < timers.size(); k++) { + minDelay = Math.min(minDelay, timers.getLong(k) - nanos); + } + } + } + } + + long delay = timerWheel.getExpirationDelay(); + if (minDelay == Long.MAX_VALUE) { + assertThat(delay, is(Long.MAX_VALUE)); + } else { + assertThat(delay, is(lessThan(minDelay + SPANS[minSpan]))); + } + } + @DataProvider(name = "fuzzySchedule") public Object[][] providesFuzzySchedule() { long[] times = new long[5_000]; @@ -139,8 +176,8 @@ private void checkTimerWheel(long nanos) { } } - private LongList getTimers(Node sentinel) { - LongList timers = new LongArrayList(); + private LongArrayList getTimers(Node sentinel) { + LongArrayList timers = new LongArrayList(); for (Node node = sentinel.getNextInVariableOrder(); node != sentinel; node = node.getNextInVariableOrder()) { timers.add(node.getVariableTime()); @@ -151,16 +188,17 @@ private LongList getTimers(Node sentinel) { @Test public void reschedule() { when(cache.evictEntry(captor.capture(), any(), anyLong())).thenReturn(true); + timerWheel.nanos = NOW; - Timer timer = new Timer(TimeUnit.MINUTES.toNanos(15)); + Timer timer = new Timer(NOW + TimeUnit.MINUTES.toNanos(15)); timerWheel.schedule(timer); Node startBucket = timer.getNextInVariableOrder(); - timer.setVariableTime(TimeUnit.HOURS.toNanos(2)); + timer.setVariableTime(NOW + TimeUnit.HOURS.toNanos(2)); timerWheel.reschedule(timer); assertThat(timer.getNextInVariableOrder(), is(not(startBucket))); - timerWheel.advance(TimeUnit.DAYS.toNanos(1)); + timerWheel.advance(NOW + TimeUnit.DAYS.toNanos(1)); checkEmpty(); } @@ -176,7 +214,8 @@ private void checkEmpty() { @Test public void deschedule() { - Timer timer = new Timer(100); + Timer timer = new Timer(NOW + 100); + timerWheel.nanos = NOW; timerWheel.schedule(timer); timerWheel.deschedule(timer); assertThat(timer.getNextInVariableOrder(), is(nullValue())); @@ -185,7 +224,8 @@ public void deschedule() { @Test public void deschedule_notScheduled() { - timerWheel.deschedule(new Timer(100)); + timerWheel.nanos = NOW; + timerWheel.deschedule(new Timer(NOW + 100)); } @Test(dataProvider = "fuzzySchedule") @@ -212,8 +252,9 @@ public void expire_reschedule() { return false; }); - timerWheel.schedule(new Timer(100)); - timerWheel.advance(TimerWheel.SPANS[0]); + timerWheel.nanos = NOW; + timerWheel.schedule(new Timer(NOW + 100)); + timerWheel.advance(NOW + TimerWheel.SPANS[0]); verify(cache).evictEntry(any(), any(), anyLong()); assertThat(captor.getValue().getNextInVariableOrder(), is(not(nullValue()))); @@ -222,11 +263,12 @@ public void expire_reschedule() { @Test(dataProvider = "cascade") public void cascade(long nanos, long timeout, int span) { - timerWheel.schedule(new Timer(timeout)); - timerWheel.advance(nanos); + timerWheel.nanos = NOW; + timerWheel.schedule(new Timer(NOW + timeout)); + timerWheel.advance(NOW + nanos); int count = 0; - for (int i = 0; i < span; i++) { + for (int i = 0; i <= span; i++) { for (int j = 0; j < timerWheel.wheel[i].length; j++) { count += getTimers(timerWheel.wheel[i][j]).size(); } @@ -247,13 +289,13 @@ public Iterator providesCascade() { } @Test(dataProvider = "snapshot") - public void snapshot(boolean ascending, int limit, long nanos, Function transformer) { + public void snapshot(boolean ascending, int limit, long clock, Function transformer) { int count = 21; - timerWheel.nanos = nanos; + timerWheel.nanos = clock; int expected = Math.min(limit, count); Comparator order = ascending ? Comparator.naturalOrder() : Comparator.reverseOrder(); List times = IntStream.range(0, count).mapToLong(i -> { - long time = nanos + TimeUnit.SECONDS.toNanos(2 << i); + long time = clock + TimeUnit.SECONDS.toNanos(2 << i); timerWheel.schedule(new Timer(time)); return time; }).boxed().sorted(order).collect(toList()).subList(0, expected); @@ -270,11 +312,11 @@ private List snapshot(boolean ascending, int limit, Function t @DataProvider(name="snapshot") public Iterator providesSnaphot() { List scenarios = new ArrayList<>(); - for (long nanos : new long[] {0L, System.nanoTime() }) { + for (long clock : new long[] {0L, NOW }) { for (int limit : new int[] { 10, 100 }) { scenarios.addAll(Arrays.asList( - new Object[] { /* ascending */ true, limit, nanos, Mockito.mock(Function.class) }, - new Object[] { /* ascending */ false, limit, nanos, Mockito.mock(Function.class) })); + new Object[] { /* ascending */ true, limit, clock, Mockito.mock(Function.class) }, + new Object[] { /* ascending */ false, limit, clock, Mockito.mock(Function.class) })); } } return scenarios.iterator(); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java index 7a6ed50370..ba0b5634af 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheContext.java @@ -44,10 +44,12 @@ import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; import com.github.benmanes.caffeine.cache.stats.CacheStats; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Advance; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExpiry; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheScheduler; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expiration; @@ -81,6 +83,7 @@ public final class CacheContext { final Expiry expiry; final Map original; final Implementation implementation; + final CacheScheduler cacheScheduler; final Listener removalListenerType; final CacheExecutor cacheExecutor; final ReferenceType valueStrength; @@ -89,6 +92,7 @@ public final class CacheContext { final Population population; final CacheWeigher weigher; final Maximum maximumSize; + final Scheduler scheduler; final Expire afterAccess; final Expire afterWrite; final Expire expiryTime; @@ -122,9 +126,9 @@ public final class CacheContext { public CacheContext(InitialCapacity initialCapacity, Stats stats, CacheWeigher weigher, Maximum maximumSize, CacheExpiry expiryType, Expire afterAccess, Expire afterWrite, Expire refresh, Advance advance, ReferenceType keyStrength, ReferenceType valueStrength, - CacheExecutor cacheExecutor, Listener removalListenerType, Population population, - boolean isLoading, boolean isAsyncLoading, Compute compute, Loader loader, Writer writer, - Implementation implementation, CacheSpec cacheSpec) { + CacheExecutor cacheExecutor, CacheScheduler cacheScheduler, Listener removalListenerType, + Population population, boolean isLoading, boolean isAsyncLoading, Compute compute, + Loader loader, Writer writer, Implementation implementation, CacheSpec cacheSpec) { this.initialCapacity = requireNonNull(initialCapacity); this.stats = requireNonNull(stats); this.weigher = requireNonNull(weigher); @@ -137,6 +141,8 @@ public CacheContext(InitialCapacity initialCapacity, Stats stats, CacheWeigher w this.valueStrength = requireNonNull(valueStrength); this.cacheExecutor = requireNonNull(cacheExecutor); this.executor = cacheExecutor.create(); + this.cacheScheduler = requireNonNull(cacheScheduler); + this.scheduler = cacheScheduler.create(); this.removalListenerType = removalListenerType; this.removalListener = removalListenerType.create(); this.population = requireNonNull(population); @@ -456,6 +462,10 @@ public Executor executor() { return executor; } + public Scheduler scheduler() { + return scheduler; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -474,6 +484,7 @@ public String toString() { .add("isAsyncLoading", isAsyncLoading) .add("writer", writer) .add("cacheExecutor", cacheExecutor) + .add("cacheScheduler", cacheScheduler) .add("removalListener", removalListenerType) .add("initialCapacity", initialCapacity) .add("stats", stats) diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheGenerator.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheGenerator.java index 173184dfb4..8e5541d57b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheGenerator.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheGenerator.java @@ -29,6 +29,7 @@ import com.github.benmanes.caffeine.cache.testing.CacheSpec.Advance; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExpiry; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheScheduler; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Compute; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire; @@ -124,6 +125,7 @@ private Set> combinations() { ImmutableSet.copyOf(keys), ImmutableSet.copyOf(values), ImmutableSet.copyOf(cacheSpec.executor()), + ImmutableSet.copyOf(cacheSpec.scheduler()), ImmutableSet.copyOf(cacheSpec.removalListener()), ImmutableSet.copyOf(cacheSpec.population()), ImmutableSet.of(true, isLoadingOnly), @@ -160,6 +162,7 @@ private CacheContext newCacheContext(List combination) { (ReferenceType) combination.get(index++), (ReferenceType) combination.get(index++), (CacheExecutor) combination.get(index++), + (CacheScheduler) combination.get(index++), (Listener) combination.get(index++), (Population) combination.get(index++), (Boolean) combination.get(index++), @@ -187,11 +190,14 @@ private boolean isCompatible(CacheContext context) { || (context.expireAfterWrite() != Expire.DISABLED)); boolean expirationIncompatible = (cacheSpec.mustExpireWithAnyOf().length > 0) && !Arrays.stream(cacheSpec.mustExpireWithAnyOf()).anyMatch(context::expires); + boolean schedulerIgnored = (context.cacheScheduler != CacheScheduler.DEFAULT) + && !context.expires(); boolean skip = asyncIncompatible || asyncLoaderIncompatible || refreshIncompatible || weigherIncompatible || expiryIncompatible || expirationIncompatible - || referenceIncompatible; + || referenceIncompatible + || schedulerIgnored; return !skip; } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java index 04f40cca99..4af12ff2a7 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheSpec.java @@ -15,6 +15,7 @@ */ package com.github.benmanes.caffeine.cache.testing; +import static com.github.benmanes.caffeine.testing.ConcurrentTestHarness.scheduledExecutor; import static java.lang.annotation.ElementType.METHOD; import static java.lang.annotation.RetentionPolicy.RUNTIME; import static java.util.Objects.requireNonNull; @@ -39,6 +40,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.mockito.Mockito; @@ -48,6 +50,7 @@ import com.github.benmanes.caffeine.cache.Expiry; import com.github.benmanes.caffeine.cache.LoadingCache; import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; import com.github.benmanes.caffeine.cache.Weigher; import com.github.benmanes.caffeine.cache.testing.RemovalListeners.ConsumingRemovalListener; import com.google.common.collect.Iterables; @@ -652,6 +655,31 @@ enum CacheExecutor { public abstract Executor create(); } + /* --------------- Scheduler --------------- */ + + /** The executors retrieved from a supplier, each resulting in a new combination. */ + CacheScheduler[] scheduler() default { + CacheScheduler.DEFAULT, + }; + + /** The scheduler that the cache can be configured with. */ + enum CacheScheduler { + DEFAULT(() -> null), // disabled + SYSTEM(Scheduler::systemScheduler), + THREADED(() -> Scheduler.forScheduledExecutorService(scheduledExecutor)), + MOCK(() -> Mockito.mock(Scheduler.class)); + + private final Supplier scheduler; + + private CacheScheduler(Supplier scheduler) { + this.scheduler = requireNonNull(scheduler); + } + + public Scheduler create() { + return scheduler.get(); + } + } + /* --------------- Populated --------------- */ /** diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java index b4ec4d0b4a..c5b34c6af6 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CacheValidationListener.java @@ -55,7 +55,7 @@ */ public final class CacheValidationListener implements IInvokedMethodListener { private static final Cache simpleNames = Caffeine.newBuilder().build(); - private static final AtomicBoolean failed = new AtomicBoolean(); + private static final AtomicBoolean detailedParams = new AtomicBoolean(); @Override public void beforeInvocation(IInvokedMethod method, ITestResult testResult) {} @@ -64,33 +64,10 @@ public void beforeInvocation(IInvokedMethod method, ITestResult testResult) {} public void afterInvocation(IInvokedMethod method, ITestResult testResult) { try { if (testResult.isSuccess()) { - boolean foundCache = false; - CacheContext context = null; - for (Object param : testResult.getParameters()) { - if (param instanceof Cache) { - foundCache = true; - assertThat((Cache) param, is(validCache())); - } else if (param instanceof AsyncLoadingCache) { - foundCache = true; - assertThat((AsyncLoadingCache) param, is(validAsyncCache())); - } else if (param instanceof Map) { - foundCache = true; - assertThat((Map) param, is(validAsMap())); - } else if (param instanceof CacheContext) { - context = (CacheContext) param; - } - } - if (context != null) { - if (!foundCache) { - assertThat(context.cache, is(validCache())); - } - checkWriter(testResult, context); - checkNoStats(testResult, context); - checkExecutor(testResult, context); - } + validate(testResult); } else { - if (!failed.get()) { - failed.set(true); + if (!detailedParams.get()) { + detailedParams.set(true); } testResult.setThrowable(new AssertionError(getTestName(method), testResult.getThrowable())); } @@ -102,6 +79,34 @@ public void afterInvocation(IInvokedMethod method, ITestResult testResult) { } } + /** Validates the internal state of the cache. */ + private void validate(ITestResult testResult) { + boolean foundCache = false; + CacheContext context = null; + for (Object param : testResult.getParameters()) { + if (param instanceof Cache) { + foundCache = true; + assertThat((Cache) param, is(validCache())); + } else if (param instanceof AsyncLoadingCache) { + foundCache = true; + assertThat((AsyncLoadingCache) param, is(validAsyncCache())); + } else if (param instanceof Map) { + foundCache = true; + assertThat((Map) param, is(validAsMap())); + } else if (param instanceof CacheContext) { + context = (CacheContext) param; + } + } + if (context != null) { + if (!foundCache) { + assertThat(context.cache, is(validCache())); + } + checkWriter(testResult, context); + checkNoStats(testResult, context); + checkExecutor(testResult, context); + } + } + /** Returns the name of the executed test. */ private static String getTestName(IInvokedMethod method) { return StringUtils.substringAfterLast(method.getTestMethod().getTestClass().getName(), ".") @@ -164,7 +169,7 @@ private void cleanUp(ITestResult testResult) { if ((param instanceof AsyncCache) || (param instanceof Cache) || (param instanceof Map) || (param instanceof Eviction) || (param instanceof Expiration) || (param instanceof VarExpiration) - || ((param instanceof CacheContext) && !failed.get())) { + || ((param instanceof CacheContext) && !detailedParams.get())) { params[i] = simpleNames.get(param.getClass(), key -> ((Class) key).getSimpleName()); } else if (param instanceof CacheContext) { params[i] = simpleNames.get(param.toString(), Object::toString); diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java index 3f5c14ed63..ac9cd1cb2b 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/cache/testing/CaffeineCacheFromContext.java @@ -24,6 +24,7 @@ import com.github.benmanes.caffeine.cache.Ticker; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExecutor; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheExpiry; +import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheScheduler; import com.github.benmanes.caffeine.cache.testing.CacheSpec.CacheWeigher; import com.github.benmanes.caffeine.cache.testing.CacheSpec.Expire; import com.github.benmanes.caffeine.cache.testing.CacheSpec.InitialCapacity; @@ -88,6 +89,9 @@ public static Cache newCaffeineCache(CacheContext context) { if (context.cacheExecutor != CacheExecutor.DEFAULT) { builder.executor(context.executor); } + if (context.cacheScheduler != CacheScheduler.DEFAULT) { + builder.scheduler(context.scheduler); + } if (context.removalListenerType != Listener.DEFAULT) { builder.removalListener(context.removalListener); } @@ -110,7 +114,7 @@ public static Cache newCaffeineCache(CacheContext context) { @SuppressWarnings("unchecked") Cache castedCache = (Cache) context.cache; - RandomSeedEnforcer.ensureRandomSeed(castedCache); + RandomSeedEnforcer.resetThreadLocalRandom(); return castedCache; } } diff --git a/caffeine/src/test/java/com/github/benmanes/caffeine/testing/ConcurrentTestHarness.java b/caffeine/src/test/java/com/github/benmanes/caffeine/testing/ConcurrentTestHarness.java index 3db10c0b28..bd390e4b41 100644 --- a/caffeine/src/test/java/com/github/benmanes/caffeine/testing/ConcurrentTestHarness.java +++ b/caffeine/src/test/java/com/github/benmanes/caffeine/testing/ConcurrentTestHarness.java @@ -21,6 +21,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicReferenceArray; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -41,8 +43,11 @@ * @author ben.manes@gmail.com (Ben Manes) */ public final class ConcurrentTestHarness { - private static final Executor executor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setPriority(Thread.MIN_PRIORITY).setDaemon(true).build()); + private static final ThreadFactory DAEMON_FACTORY = new ThreadFactoryBuilder() + .setPriority(Thread.MIN_PRIORITY).setDaemon(true).build(); + public static final ScheduledExecutorService scheduledExecutor = + Executors.newSingleThreadScheduledExecutor(DAEMON_FACTORY); + public static final Executor executor = Executors.newCachedThreadPool(DAEMON_FACTORY); private ConcurrentTestHarness() {} diff --git a/gradle.properties b/gradle.properties index 3e2ba3c3ba..ea6188903f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,4 @@ org.gradle.jvmargs=-Xmx1024m -XX:+UseG1GC -XX:SoftRefLRUPolicyMSPerMB=0 -noverify -XX:+HeapDumpOnOutOfMemoryError -org.gradle.caching=true org.gradle.daemon=true nexusUsername= nexusPassword= diff --git a/gradle/codeQuality.gradle b/gradle/codeQuality.gradle index 887e3fd9a0..48e32074de 100644 --- a/gradle/codeQuality.gradle +++ b/gradle/codeQuality.gradle @@ -49,6 +49,7 @@ spotbugs { pmd { ruleSets = [] toolVersion = pluginVersions.pmd + incrementalAnalysis = !System.env.'CI' ruleSetConfig = resources.text.fromFile(file("${rootDir}/config/pmd/rulesSets.xml")) } diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 5fb7d6dd71..c3ee24bb68 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -25,14 +25,14 @@ */ ext { versions = [ - akka: '2.6.0-M4', - checkerFramework: '2.9.0', + akka: '2.6.0-M5', + checkerFramework: '2.10.0', commonsCompress: '1.18', commonsLang3: '3.9', config: '1.3.4', errorprone: '2.3.3', errorproneJavac: '9+181-r4173-1', - fastutil: '8.2.3', + fastutil: '8.3.0', flipTables: '1.0.2', guava: '28.0-jre', javapoet: '1.11.1', @@ -60,11 +60,11 @@ ext { collision: '0.3.3', commonsMath3: '3.6.1', concurrentlinkedhashmap: '1.4.2', - ehcache3: '3.7.1', - elasticSearch: '7.2.0', + ehcache3: '3.8.0', + elasticSearch: '7.3.0', expiringMap: '0.5.9', fastfilter: 'bf0b02297f', - jackrabbit: '1.14.0', + jackrabbit: '1.16.0', jamm: '0.3.3', javaObjectLayout: '0.9', jmh: '1.21', @@ -78,16 +78,16 @@ ext { apt: '0.21', bnd: '4.2.0', buildscan: '2.3', - checkstyle: '8.22', + checkstyle: '8.23', coveralls: '2.8.3', coverity: '1.0.10', errorprone: '0.8.1', jacoco: '0.8.4', - jmh: '0.5.0-rc-1', + jmh: '0.5.0-rc-2', jmhReport: '0.9.0', nexus: '2.3.1', nullaway: '0.3', - pmd: '6.16.0', + pmd: '6.17.0', semanticVersioning: '1.1.0', shadow: '5.1.0', sonarqube: '2.7.1', diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b0714d1511..5d1fe27669 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.5.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-rc-1-bin.zip