Skip to content

Commit

Permalink
Coalescing bulkloader sample (#336)
Browse files Browse the repository at this point in the history
  • Loading branch information
sheepdreamofandroids authored and ben-manes committed Sep 28, 2019
1 parent 50515e2 commit a10a875
Show file tree
Hide file tree
Showing 4 changed files with 404 additions and 0 deletions.
47 changes: 47 additions & 0 deletions examples/coalescing-bulkloader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.github.benmanes.caffeine.examples</groupId>
<artifactId>coalescing-bulkloader</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
<caffeine.version>2.7.0</caffeine.version>
<junit.version>4.12</junit.version>
<awaitility.version>3.1.6</awaitility.version>
</properties>

<dependencies>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright 2019 Guus C. Bloemsma. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.benmanes.caffeine.examples.coalescing.bulkloader;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.toMap;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
* An implementation of {@link AsyncCacheLoader} that delays fetching a bit until "enough" keys are collected
* to do a bulk call. The assumption is that doing a bulk call is so much more efficient that it is worth
* the wait.
*
* @param <Key> the type of the key in the cache
* @param <Value> the type of the value in the cache
* @author complain to: [email protected]
*/
public class CoalescingBulkloader<Key, Value> implements AsyncCacheLoader<Key, Value> {
private final Consumer<Collection<WaitingKey>> bulkLoader;
private int maxLoadSize; // maximum number of keys to load in one call
private long maxDelay; // maximum time between request of a value and loading it
private volatile Queue<WaitingKey> waitingKeys = new ConcurrentLinkedQueue<>();
private ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
private ScheduledFuture<?> schedule;
// Queue.size() is expensive, so here we keep track of the queue size separately
private AtomicInteger size = new AtomicInteger(0);

private final class WaitingKey {
Key key;
CompletableFuture<Value> future;
long waitingSince;
}

/**
* Wraps a bulk loader that returns values in the same order as the keys.
*
* @param maxLoadSize Maximum number of keys per bulk load
* @param maxDelay Maximum time to wait before bulk load is executed
* @param load Loader that takes keys and returns a future with the values in the same order as the keys.
* Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException}
* for the corresponding future.
*/
public static <Key, Value> CoalescingBulkloader<Key, Value> byOrder(int maxLoadSize, long maxDelay,
final Function<Stream<Key>, CompletableFuture<Stream<Value>>> load) {
return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> {
final Stream<Key> keys = toLoad.stream().map(wk -> wk.key);
load.apply(keys).thenAccept(values -> {
final Iterator<Value> iv = values.iterator();
for (CoalescingBulkloader<Key, Value>.WaitingKey waitingKey : toLoad) {
if (iv.hasNext())
waitingKey.future.complete(iv.next());
else
waitingKey.future.completeExceptionally(new NoSuchElementException("No value for key " + waitingKey.key));
}
});
});
}

/**
* Wraps a bulk loader that returns values in a map accessed by key.
*
* @param maxLoadSize Maximum number of keys per bulk load
* @param maxDelay Maximum time to wait before bulk load is executed
* @param load Loader that takes keys and returns a future with a map with keys and values.
* Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException}
* for the corresponding future.
*/
public static <Key, Value> CoalescingBulkloader<Key, Value> byMap(int maxLoadSize, long maxDelay,
final Function<Stream<Key>, CompletableFuture<Map<Key, Value>>> load) {
return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> {
final Stream<Key> keys = toLoad.stream().map(wk -> wk.key);
load.apply(keys).thenAccept(values -> {
for (CoalescingBulkloader<Key, Value>.WaitingKey waitingKey : toLoad) {
if (values.containsKey(waitingKey.key))
waitingKey.future.complete(values.get(waitingKey.key));
else
waitingKey.future.completeExceptionally(new NoSuchElementException("No value for key " + waitingKey.key));
}
});
});
}

/**
* Wraps a bulk loader that returns intermediate values from which keys and values can be extracted.
*
* @param <Intermediate> Some internal type from which keys and values can be extracted.
* @param maxLoadSize Maximum number of keys per bulk load
* @param maxDelay Maximum time to wait before bulk load is executed
* @param keyExtractor How to extract key from intermediate value
* @param valueExtractor How to extract value from intermediate value
* @param load Loader that takes keys and returns a future with a map with keys and values.
* Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException}
* for the corresponding future.
*/
public static <Key, Value, Intermediate> CoalescingBulkloader<Key, Value> byExtraction(int maxLoadSize, long maxDelay,
final Function<Intermediate, Key> keyExtractor,
final Function<Intermediate, Value> valueExtractor,
final Function<Stream<Key>, CompletableFuture<Stream<Intermediate>>> load) {
return byMap(maxLoadSize, maxDelay,
keys -> load.apply(keys).thenApply(
intermediates -> intermediates.collect(toMap(keyExtractor, valueExtractor))));
}

private CoalescingBulkloader(int maxLoadSize, long maxDelay, Consumer<Collection<WaitingKey>> bulkLoader) {
this.bulkLoader = bulkLoader;
assert maxLoadSize > 0;
assert maxDelay > 0;
this.maxLoadSize = maxLoadSize;
this.maxDelay = maxDelay;
}

@Override public @NonNull CompletableFuture<Value> asyncLoad(@NonNull Key key, @NonNull Executor executor) {
final WaitingKey waitingKey = new WaitingKey();
waitingKey.key = key;
waitingKey.future = new CompletableFuture<>();
waitingKey.waitingSince = System.currentTimeMillis();
waitingKeys.add(waitingKey);

if (size.incrementAndGet() >= maxLoadSize) {
doLoad();
} else if (schedule == null || schedule.isDone()) {
startWaiting();
}

return waitingKey.future;
}

synchronized private void startWaiting() {
schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS);
}

synchronized private void doLoad() {
schedule.cancel(false);
do {
List<WaitingKey> toLoad = new ArrayList<>(Math.min(size.get(), maxLoadSize));
int counter = maxLoadSize;
while (counter > 0) {
final WaitingKey waitingKey = waitingKeys.poll();
if (waitingKey == null)
break;
else {
toLoad.add(waitingKey);
counter--;
}
}

final int taken = maxLoadSize - counter;
if (taken > 0) {
bulkLoader.accept(toLoad);
size.updateAndGet(oldSize -> oldSize - taken);
}

} while (size.get() >= maxLoadSize);
final WaitingKey nextWaitingKey = waitingKeys.peek();
if (nextWaitingKey != null) {
schedule = timer.schedule(this::doLoad, nextWaitingKey.waitingSince + maxDelay - System.currentTimeMillis(), MILLISECONDS);
}
}

}
Loading

0 comments on commit a10a875

Please sign in to comment.