Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some cleanup #620

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package com.github.benmanes.caffeine.examples.coalescing.bulkloader;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand All @@ -37,6 +35,9 @@
import java.util.function.Function;
import java.util.stream.Stream;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;

/**
* An implementation of {@link AsyncCacheLoader} that delays fetching a bit until "enough" keys are collected
* to do a bulk call. The assumption is that doing a bulk call is so much more efficient that it is worth
Expand All @@ -47,19 +48,23 @@
* @author complain to: [email protected]
*/
public class CoalescingBulkloader<Key, Value> implements AsyncCacheLoader<Key, Value> {
private final Consumer<Collection<WaitingKey>> bulkLoader;
private int maxLoadSize; // maximum number of keys to load in one call
private long maxDelay; // maximum time between request of a value and loading it
private volatile Queue<WaitingKey> waitingKeys = new ConcurrentLinkedQueue<>();
private ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
private final Consumer<Collection<WaitingKey<Key, Value>>> bulkLoader;
private final int maxLoadSize; // maximum number of keys to load in one call
private final long maxDelay; // maximum time between request of a value and loading it
private final Queue<WaitingKey<Key, Value>> waitingKeys = new ConcurrentLinkedQueue<>();
private final ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> schedule;
// Queue.size() is expensive, so here we keep track of the queue size separately
private AtomicInteger size = new AtomicInteger(0);
private final AtomicInteger size = new AtomicInteger(0);

private final class WaitingKey {
Key key;
CompletableFuture<Value> future;
long waitingSince;
private static class WaitingKey<Key, Value> {
private final Key key;
private final CompletableFuture<Value> future = new CompletableFuture<>();
private final long waitingSince = System.currentTimeMillis();

private WaitingKey(Key key) {
this.key = key;
}
}

/**
Expand All @@ -71,13 +76,15 @@ private final class WaitingKey {
* Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException}
* for the corresponding future.
*/
public static <Key, Value> CoalescingBulkloader<Key, Value> byOrder(int maxLoadSize, long maxDelay,
public static <Key, Value> CoalescingBulkloader<Key, Value> byOrder(
int maxLoadSize,
long maxDelay,
final Function<Stream<Key>, CompletableFuture<Stream<Value>>> load) {
return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> {
final Stream<Key> keys = toLoad.stream().map(wk -> wk.key);
load.apply(keys).thenAccept(values -> {
final Iterator<Value> iv = values.iterator();
for (CoalescingBulkloader<Key, Value>.WaitingKey waitingKey : toLoad) {
for (CoalescingBulkloader.WaitingKey<Key, Value> waitingKey : toLoad) {
if (iv.hasNext())
waitingKey.future.complete(iv.next());
else
Expand All @@ -96,12 +103,14 @@ public static <Key, Value> CoalescingBulkloader<Key, Value> byOrder(int maxLoadS
* Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException}
* for the corresponding future.
*/
public static <Key, Value> CoalescingBulkloader<Key, Value> byMap(int maxLoadSize, long maxDelay,
public static <Key, Value> CoalescingBulkloader<Key, Value> byMap(
int maxLoadSize,
long maxDelay,
final Function<Stream<Key>, CompletableFuture<Map<Key, Value>>> load) {
return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> {
final Stream<Key> keys = toLoad.stream().map(wk -> wk.key);
load.apply(keys).thenAccept(values -> {
for (CoalescingBulkloader<Key, Value>.WaitingKey waitingKey : toLoad) {
for (CoalescingBulkloader.WaitingKey<Key, Value> waitingKey : toLoad) {
if (values.containsKey(waitingKey.key))
waitingKey.future.complete(values.get(waitingKey.key));
else
Expand All @@ -123,7 +132,9 @@ public static <Key, Value> CoalescingBulkloader<Key, Value> byMap(int maxLoadSiz
* Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException}
* for the corresponding future.
*/
public static <Key, Value, Intermediate> CoalescingBulkloader<Key, Value> byExtraction(int maxLoadSize, long maxDelay,
public static <Key, Value, Intermediate> CoalescingBulkloader<Key, Value> byExtraction(
int maxLoadSize,
long maxDelay,
final Function<Intermediate, Key> keyExtractor,
final Function<Intermediate, Value> valueExtractor,
final Function<Stream<Key>, CompletableFuture<Stream<Intermediate>>> load) {
Expand All @@ -132,19 +143,17 @@ public static <Key, Value, Intermediate> CoalescingBulkloader<Key, Value> byExtr
intermediates -> intermediates.collect(toMap(keyExtractor, valueExtractor))));
}

private CoalescingBulkloader(int maxLoadSize, long maxDelay, Consumer<Collection<WaitingKey>> bulkLoader) {
private CoalescingBulkloader(int maxLoadSize, long maxDelay, Consumer<Collection<WaitingKey<Key, Value>>> bulkLoader) {
this.bulkLoader = bulkLoader;
assert maxLoadSize > 0;
assert maxDelay > 0;
this.maxLoadSize = maxLoadSize;
this.maxDelay = maxDelay;
}

@Override public CompletableFuture<Value> asyncLoad(Key key, Executor executor) {
final WaitingKey waitingKey = new WaitingKey();
waitingKey.key = key;
waitingKey.future = new CompletableFuture<>();
waitingKey.waitingSince = System.currentTimeMillis();
@Override
public CompletableFuture<Value> asyncLoad(Key key, Executor executor) {
final WaitingKey<Key, Value> waitingKey = new WaitingKey<>(key);
waitingKeys.add(waitingKey);

if (size.incrementAndGet() >= maxLoadSize) {
Expand All @@ -156,17 +165,18 @@ private CoalescingBulkloader(int maxLoadSize, long maxDelay, Consumer<Collection
return waitingKey.future;
}

synchronized private void startWaiting() {
private synchronized void startWaiting() {
if (schedule != null)
schedule.cancel(false);
schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS);
}

synchronized private void doLoad() {
schedule.cancel(false);
private synchronized void doLoad() {
do {
List<WaitingKey> toLoad = new ArrayList<>(Math.min(size.get(), maxLoadSize));
List<WaitingKey<Key, Value>> toLoad = new ArrayList<>(Math.min(size.get(), maxLoadSize));
int counter = maxLoadSize;
while (counter > 0) {
final WaitingKey waitingKey = waitingKeys.poll();
final WaitingKey<Key, Value> waitingKey = waitingKeys.poll();
if (waitingKey == null)
break;
else {
Expand All @@ -182,7 +192,7 @@ synchronized private void doLoad() {
}

} while (size.get() >= maxLoadSize);
final WaitingKey nextWaitingKey = waitingKeys.peek();
final WaitingKey<Key, Value> nextWaitingKey = waitingKeys.peek();
if (nextWaitingKey != null) {
schedule = timer.schedule(this::doLoad, nextWaitingKey.waitingSince + maxDelay - System.currentTimeMillis(), MILLISECONDS);
}
Expand Down