Skip to content

Commit

Permalink
Support prompt expiration via an optional scheduler thread (fixes #195)
Browse files Browse the repository at this point in the history
Typically the cache using an amortized maintenance routine to periodically
keep the eviction policies up to date. For size this is to replay hits or
evict, for expiration this is to discover stale entries, and for reference
caching this is to find GC'd keys or values. Each policy uses O(1)
algorithms to be cheap, batches the work to avoid lock contention, and
performs this routine maintenance as a side effect of normal cache
operations.

If there is no activity on the cache then the routine maintenance is not
triggered, as that requires a background scheduling thread. Generally this
is okay as the memory was already in consumed, no activity may indicate low
system usage, and the cache is a transient data store. However, some do want
to trigger business logic and leverage the cache as a timing subsystem, e.g.
using the `RemovalListener` to notify another component.

The cache itself does not create or manage threads, but can defer work to
a configured `Executor`. Similarly now it can now schedule the maintenance
routine on a configured `Scheduler`, default disabled. The cache will still
perform maintenance periodically and this scheduler only augments it to add
additional liveliness. The scheduling is based on the expiration time only;
users may use `Cache.cleanUp()` for fixed periods to assist reference
caching, etc as desired.

The cache will rate limit the scheduler so that it runs at a reasonable pace
and never thrashes. For example, if the next expiration times are tiny
durations apart (e.g. 200ms), the cache will not run the maintenance task
5x/s. It will instead ensure a 1 second delay (2^30ns) between scheduled runs
(not accounting for the lack of real-time guarantees by a Scheduler impl).
Similarly an additional delay may occur due to the priority queues being held
in best-effort order.

The scheduling for variable expiration is based on the TimerWheel's bucket
expiration, which cascades events. For example a timer of 1h 13m 5s would
schedule to run in 1h, which would cascade the 1hr bucket into the minute
buckets. The next schedule would be at 13m, followed by 5s.

Java 9+ users should prefer using `Scheduler.systemScheduler()` rather than
creating their own dedicated threads. This leverages the JVM-wide scheduling
thread, primarily intended for `CompletableFuture.orTimeout(duration)`. In
Java 8 this method returns `Scheduler.disabledScheduler` instead.
  • Loading branch information
ben-manes committed Aug 2, 2019
1 parent cc38ca4 commit 405f98a
Show file tree
Hide file tree
Showing 28 changed files with 1,111 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ public static boolean usesExpirationTicker(Set<Feature> features) {
|| features.contains(Feature.REFRESH_WRITE);
}

public static boolean usesExpiration(Set<Feature> features) {
return features.contains(Feature.EXPIRE_ACCESS)
|| features.contains(Feature.EXPIRE_WRITE);
}

public static boolean usesMaximum(Set<Feature> features) {
return features.contains(Feature.MAXIMUM_SIZE)
|| features.contains(Feature.MAXIMUM_WEIGHT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import com.github.benmanes.caffeine.cache.local.AddMaximum;
import com.github.benmanes.caffeine.cache.local.AddRefreshAfterWrite;
import com.github.benmanes.caffeine.cache.local.AddRemovalListener;
import com.github.benmanes.caffeine.cache.local.AddPacer;
import com.github.benmanes.caffeine.cache.local.AddStats;
import com.github.benmanes.caffeine.cache.local.AddSubtype;
import com.github.benmanes.caffeine.cache.local.AddWriteBuffer;
Expand Down Expand Up @@ -83,7 +84,7 @@ public final class LocalCacheFactoryGenerator {
new AddKeyValueStrength(), new AddRemovalListener(), new AddStats(),
new AddExpirationTicker(), new AddMaximum(), new AddFastPath(), new AddDeques(),
new AddExpireAfterAccess(), new AddExpireAfterWrite(), new AddRefreshAfterWrite(),
new AddWriteBuffer(), new Finalize());
new AddWriteBuffer(), new AddPacer(), new Finalize());
final ZoneId timeZone = ZoneId.of("America/Los_Angeles");
final Path directory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class Specifications {
public static final TypeName STATS_COUNTER =
ClassName.get(PACKAGE_NAME + ".stats", "StatsCounter");
public static final TypeName TICKER = ClassName.get(PACKAGE_NAME, "Ticker");
public static final TypeName PACER = ClassName.get(PACKAGE_NAME, "Pacer");

public static final TypeName ACCESS_ORDER_DEQUE =
ParameterizedTypeName.get(ClassName.get(PACKAGE_NAME, "AccessOrderDeque"), NODE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2019 Ben Manes. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.cache.local;

import static com.github.benmanes.caffeine.cache.Specifications.PACER;

import javax.lang.model.element.Modifier;

import com.github.benmanes.caffeine.cache.Feature;
import com.squareup.javapoet.FieldSpec;
import com.squareup.javapoet.MethodSpec;

/**
* @author [email protected] (Ben Manes)
*/
public final class AddPacer extends LocalCacheRule {

@Override
protected boolean applies() {
return !(Feature.usesExpiration(context.parentFeatures)
|| !Feature.usesExpiration(context.generateFeatures));
}

@Override
protected void execute() {
context.constructor.addStatement("this.pacer = ($1L == $2L)\n? null\n: new Pacer($1L)",
"builder.getScheduler()", "Scheduler.disabledScheduler()");
context.cache.addField(FieldSpec.builder(PACER, "pacer", Modifier.FINAL).build());
context.cache.addMethod(MethodSpec.methodBuilder("pacer")
.addModifiers(context.publicFinalModifiers())
.addStatement("return pacer")
.returns(PACER)
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.github.benmanes.caffeine.cache;

import static com.github.benmanes.caffeine.cache.Async.ASYNC_EXPIRY;
import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo;
import static com.github.benmanes.caffeine.cache.Caffeine.requireArgument;
import static com.github.benmanes.caffeine.cache.Caffeine.requireState;
import static com.github.benmanes.caffeine.cache.LocalLoadingCache.newBulkMappingFunction;
Expand All @@ -30,6 +31,7 @@
import java.io.Serializable;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.util.AbstractCollection;
import java.util.AbstractMap;
import java.util.AbstractSet;
Expand Down Expand Up @@ -234,7 +236,7 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
writer = builder.getCacheWriter();
evictionLock = new ReentrantLock();
weigher = builder.getWeigher(isAsync);
drainBuffersTask = new PerformCleanupTask();
drainBuffersTask = new PerformCleanupTask(this);
nodeFactory = NodeFactory.newFactory(builder, isAsync);
data = new ConcurrentHashMap<>(builder.getInitialCapacity());
readBuffer = evicts() || collectKeys() || collectValues() || expiresAfterAccess()
Expand All @@ -247,11 +249,6 @@ protected BoundedLocalCache(Caffeine<K, V> builder,
}
}

static int ceilingPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << -Integer.numberOfLeadingZeros(x - 1);
}

/* --------------- Shared --------------- */

/** Returns if the node's value is currently being computed, asynchronously. */
Expand Down Expand Up @@ -339,7 +336,7 @@ public void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause
}
};
try {
executor().execute(task);
executor.execute(task);
} catch (Throwable t) {
logger.log(Level.SEVERE, "Exception thrown when submitting removal listener", t);
task.run();
Expand Down Expand Up @@ -370,6 +367,11 @@ protected ReferenceQueue<V> valueReferenceQueue() {

/* --------------- Expiration Support --------------- */

/** Returns the {@link Pacer} used to schedule the maintenance task. */
protected @Nullable Pacer pacer() {
return null;
}

/** Returns if the cache expires entries after a variable time threshold. */
protected boolean expiresVariable() {
return false;
Expand Down Expand Up @@ -780,6 +782,13 @@ void expireEntries() {
expireAfterAccessEntries(now);
expireAfterWriteEntries(now);
expireVariableEntries(now);

if (pacer() != null) {
long delay = getExpirationDelay(now);
if (delay != Long.MAX_VALUE) {
pacer().schedule(executor, drainBuffersTask, now, delay);
}
}
}

/** Expires entries in the access-order queue. */
Expand Down Expand Up @@ -833,6 +842,38 @@ void expireVariableEntries(long now) {
}
}

/** Returns the duration until the next item expires, or {@link Long.MAX_VALUE} if none. */
@GuardedBy("evictionLock")
private long getExpirationDelay(long now) {
long delay = Long.MAX_VALUE;
if (expiresAfterAccess()) {
Node<K, V> node = accessOrderWindowDeque().peekFirst();
if (node != null) {
delay = Math.min(delay, now - node.getAccessTime() + expiresAfterAccessNanos());
}
if (evicts()) {
node = accessOrderProbationDeque().peekFirst();
if (node != null) {
delay = Math.min(delay, now - node.getAccessTime() + expiresAfterAccessNanos());
}
node = accessOrderProtectedDeque().peekFirst();
if (node != null) {
delay = Math.min(delay, now - node.getAccessTime() + expiresAfterAccessNanos());
}
}
}
if (expiresAfterWrite()) {
Node<K, V> node = writeOrderDeque().peekFirst();
if (node != null) {
delay = Math.min(delay, now - node.getWriteTime() + expiresAfterWriteNanos());
}
}
if (expiresVariable()) {
delay = Math.min(delay, timerWheel().getExpirationDelay());
}
return delay;
}

/** Returns if the entry has expired. */
@SuppressWarnings("ShortCircuitBoolean")
boolean hasExpired(Node<K, V> node, long now) {
Expand Down Expand Up @@ -1375,7 +1416,7 @@ void scheduleDrainBuffers() {
return;
}
lazySetDrainStatus(PROCESSING_TO_IDLE);
executor().execute(drainBuffersTask);
executor.execute(drainBuffersTask);
} catch (Throwable t) {
logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
maintenance(/* ignored */ null);
Expand Down Expand Up @@ -3259,9 +3300,15 @@ public int characteristics() {
}

/** A reusable task that performs the maintenance work; used to avoid wrapping by ForkJoinPool. */
final class PerformCleanupTask extends ForkJoinTask<Void> implements Runnable {
static final class PerformCleanupTask extends ForkJoinTask<Void> implements Runnable {
private static final long serialVersionUID = 1L;

final WeakReference<BoundedLocalCache<?, ?>> reference;

PerformCleanupTask(BoundedLocalCache<?, ?> cache) {
reference = new WeakReference<BoundedLocalCache<?,?>>(cache);
}

@Override
public boolean exec() {
try {
Expand All @@ -3276,7 +3323,10 @@ public boolean exec() {

@Override
public void run() {
performCleanUp(/* ignored */ null);
BoundedLocalCache<?, ?> cache = reference.get();
if (cache != null) {
cache.performCleanUp(/* ignored */ null);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.logging.Level;
Expand Down Expand Up @@ -98,10 +99,11 @@
* <p>
* If {@linkplain #expireAfter(Expiry) expireAfter},
* {@linkplain #expireAfterWrite expireAfterWrite}, or
* {@linkplain #expireAfterAccess expireAfterAccess} is requested entries may be evicted on each
* cache modification, on occasional cache accesses, or on calls to {@link Cache#cleanUp}. Expired
* entries may be counted by {@link Cache#estimatedSize()}, but will never be visible to read or
* write operations.
* {@linkplain #expireAfterAccess expireAfterAccess} is requested then entries may be evicted on
* each cache modification, on occasional cache accesses, or on calls to {@link Cache#cleanUp}. A
* {@linkplain #scheduler(Scheduler)} may be specified to provide prompt removal of expired entries
* rather than waiting until activity triggers the periodic maintenance. Expired entries may be
* counted by {@link Cache#estimatedSize()}, but will never be visible to read or write operations.
* <p>
* If {@linkplain #weakKeys weakKeys}, {@linkplain #weakValues weakValues}, or
* {@linkplain #softValues softValues} are requested, it is possible for a key or value present in
Expand Down Expand Up @@ -159,6 +161,7 @@ enum Strength { WEAK, SOFT }
@Nullable CacheWriter<? super K, ? super V> writer;
@Nullable Weigher<? super K, ? super V> weigher;
@Nullable Expiry<? super K, ? super V> expiry;
@Nullable Scheduler scheduler;
@Nullable Executor executor;
@Nullable Ticker ticker;

Expand Down Expand Up @@ -195,6 +198,18 @@ static void requireState(boolean expression, String template, @Nullable Object..
}
}

/** Returns the smallest power of two greater than or equal to {@code x}. */
static int ceilingPowerOfTwo(int x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1 << -Integer.numberOfLeadingZeros(x - 1);
}

/** Returns the smallest power of two greater than or equal to {@code x}. */
static long ceilingPowerOfTwo(long x) {
// From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
return 1L << -Long.numberOfLeadingZeros(x - 1);
}

/**
* Constructs a new {@code Caffeine} instance with default settings, including strong keys, strong
* values, and no automatic eviction of any kind.
Expand Down Expand Up @@ -289,6 +304,43 @@ Executor getExecutor() {
return (executor == null) ? ForkJoinPool.commonPool() : executor;
}

/**
* Specifies the scheduler to use when scheduling routine maintenance based on an expiration
* event. This augments the periodic maintenance that occurs during normal cache operations to
* allow for the promptly removal of expired entries regardless of whether any cache activity is
* occurring at that time. By default, {@link Scheduler#disabledScheduler()} is used.
* <p>
* The scheduling between expiration events is paced to exploit batching and to minimize
* executions in short succession. This minimum difference between the scheduled executions is
* implementation-specific, currently at ~1 second (2^30 ns). In addition, the provided scheduler
* may not offer real-time guarantees (including {@link ThreadPoolExecutor}). The scheduling is
* best-effort and does not make any hard guarantees of when an expired entry will be removed.
* <p>
* <b>Note for Java 9 and later:</b> consider using {@link Scheduler#systemScheduler()} to
* leverage the dedicated, system-wide scheduling thread.
*
* @param scheduler the scheduler that submits a task to the {@link #executor(Executor)} after a
* given delay
* @return this {@code Caffeine} instance (for chaining)
* @throws NullPointerException if the specified scheduler is null
*/
@NonNull
public Caffeine<K, V> scheduler(@NonNull Scheduler scheduler) {
requireState(this.scheduler == null, "scheduler was already set to %s", this.scheduler);
this.scheduler = requireNonNull(scheduler);
return this;
}

@NonNull
Scheduler getScheduler() {
if ((scheduler == null) || (scheduler == Scheduler.disabledScheduler())) {
return Scheduler.disabledScheduler();
} else if (scheduler == Scheduler.systemScheduler()) {
return scheduler;
}
return Scheduler.guardedScheduler(scheduler);
}

/**
* Specifies the maximum number of entries the cache may contain. Note that the cache <b>may evict
* an entry before this limit is exceeded or temporarily exceed the threshold while evicting</b>.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,16 @@ interface LocalCache<K, V> extends ConcurrentMap<K, V> {
void notifyRemoval(@Nullable K key, @Nullable V value, RemovalCause cause);

/** Returns the {@link Executor} used by this cache. */
@NonNull
Executor executor();
@NonNull Executor executor();

/** Returns whether the cache captures the write time of the entry. */
boolean hasWriteTime();

/** Returns the {@link Ticker} used by this cache for expiration. */
@NonNull
Ticker expirationTicker();
@NonNull Ticker expirationTicker();

/** Returns the {@link Ticker} used by this cache for statistics. */
@NonNull
Ticker statsTicker();
@NonNull Ticker statsTicker();

/** See {@link Cache#estimatedSize()}. */
long estimatedSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package com.github.benmanes.caffeine.cache;

import static com.github.benmanes.caffeine.base.UnsafeAccess.UNSAFE;
import static com.github.benmanes.caffeine.cache.BoundedLocalCache.ceilingPowerOfTwo;
import static com.github.benmanes.caffeine.cache.Caffeine.ceilingPowerOfTwo;
import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ARRAY_BASE;
import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT;
import static com.github.benmanes.caffeine.cache.UnsafeRefArrayAccess.lvElement;
Expand Down
Loading

0 comments on commit 405f98a

Please sign in to comment.