From a10a8756d3daf4e26965c3ec3e16bde8c1871b5d Mon Sep 17 00:00:00 2001 From: "Guus C. Bloemsma" Date: Wed, 8 May 2019 16:12:16 +0200 Subject: [PATCH] Coalescing bulkloader sample (#336) --- examples/coalescing-bulkloader/pom.xml | 47 +++++ .../bulkloader/CoalescingBulkloader.java | 192 ++++++++++++++++++ .../bulkloader/CoalescingBulkloaderTest.java | 164 +++++++++++++++ travis.sh | 1 + 4 files changed, 404 insertions(+) create mode 100644 examples/coalescing-bulkloader/pom.xml create mode 100644 examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java create mode 100644 examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java diff --git a/examples/coalescing-bulkloader/pom.xml b/examples/coalescing-bulkloader/pom.xml new file mode 100644 index 0000000000..3b118e638d --- /dev/null +++ b/examples/coalescing-bulkloader/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + org.github.benmanes.caffeine.examples + coalescing-bulkloader + 1.0-SNAPSHOT + + + 1.8 + 2.7.0 + 4.12 + 3.1.6 + + + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + + junit + junit + ${junit.version} + + + org.awaitility + awaitility + ${awaitility.version} + + + + + + + maven-compiler-plugin + + ${java.version} + ${java.version} + + + + + diff --git a/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java new file mode 100644 index 0000000000..5c2087e89b --- /dev/null +++ b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java @@ -0,0 +1,192 @@ +/* + * Copyright 2019 Guus C. Bloemsma. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.examples.coalescing.bulkloader; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toMap; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * An implementation of {@link AsyncCacheLoader} that delays fetching a bit until "enough" keys are collected + * to do a bulk call. The assumption is that doing a bulk call is so much more efficient that it is worth + * the wait. + * + * @param the type of the key in the cache + * @param the type of the value in the cache + * @author complain to: guus@bloemsma.net + */ +public class CoalescingBulkloader implements AsyncCacheLoader { + private final Consumer> bulkLoader; + private int maxLoadSize; // maximum number of keys to load in one call + private long maxDelay; // maximum time between request of a value and loading it + private volatile Queue waitingKeys = new ConcurrentLinkedQueue<>(); + private ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + private ScheduledFuture schedule; + // Queue.size() is expensive, so here we keep track of the queue size separately + private AtomicInteger size = new AtomicInteger(0); + + private final class WaitingKey { + Key key; + CompletableFuture future; + long waitingSince; + } + + /** + * Wraps a bulk loader that returns values in the same order as the keys. + * + * @param maxLoadSize Maximum number of keys per bulk load + * @param maxDelay Maximum time to wait before bulk load is executed + * @param load Loader that takes keys and returns a future with the values in the same order as the keys. + * Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException} + * for the corresponding future. + */ + public static CoalescingBulkloader byOrder(int maxLoadSize, long maxDelay, + final Function, CompletableFuture>> load) { + return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> { + final Stream keys = toLoad.stream().map(wk -> wk.key); + load.apply(keys).thenAccept(values -> { + final Iterator iv = values.iterator(); + for (CoalescingBulkloader.WaitingKey waitingKey : toLoad) { + if (iv.hasNext()) + waitingKey.future.complete(iv.next()); + else + waitingKey.future.completeExceptionally(new NoSuchElementException("No value for key " + waitingKey.key)); + } + }); + }); + } + + /** + * Wraps a bulk loader that returns values in a map accessed by key. + * + * @param maxLoadSize Maximum number of keys per bulk load + * @param maxDelay Maximum time to wait before bulk load is executed + * @param load Loader that takes keys and returns a future with a map with keys and values. + * Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException} + * for the corresponding future. + */ + public static CoalescingBulkloader byMap(int maxLoadSize, long maxDelay, + final Function, CompletableFuture>> load) { + return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> { + final Stream keys = toLoad.stream().map(wk -> wk.key); + load.apply(keys).thenAccept(values -> { + for (CoalescingBulkloader.WaitingKey waitingKey : toLoad) { + if (values.containsKey(waitingKey.key)) + waitingKey.future.complete(values.get(waitingKey.key)); + else + waitingKey.future.completeExceptionally(new NoSuchElementException("No value for key " + waitingKey.key)); + } + }); + }); + } + + /** + * Wraps a bulk loader that returns intermediate values from which keys and values can be extracted. + * + * @param Some internal type from which keys and values can be extracted. + * @param maxLoadSize Maximum number of keys per bulk load + * @param maxDelay Maximum time to wait before bulk load is executed + * @param keyExtractor How to extract key from intermediate value + * @param valueExtractor How to extract value from intermediate value + * @param load Loader that takes keys and returns a future with a map with keys and values. + * Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException} + * for the corresponding future. + */ + public static CoalescingBulkloader byExtraction(int maxLoadSize, long maxDelay, + final Function keyExtractor, + final Function valueExtractor, + final Function, CompletableFuture>> load) { + return byMap(maxLoadSize, maxDelay, + keys -> load.apply(keys).thenApply( + intermediates -> intermediates.collect(toMap(keyExtractor, valueExtractor)))); + } + + private CoalescingBulkloader(int maxLoadSize, long maxDelay, Consumer> bulkLoader) { + this.bulkLoader = bulkLoader; + assert maxLoadSize > 0; + assert maxDelay > 0; + this.maxLoadSize = maxLoadSize; + this.maxDelay = maxDelay; + } + + @Override public @NonNull CompletableFuture asyncLoad(@NonNull Key key, @NonNull Executor executor) { + final WaitingKey waitingKey = new WaitingKey(); + waitingKey.key = key; + waitingKey.future = new CompletableFuture<>(); + waitingKey.waitingSince = System.currentTimeMillis(); + waitingKeys.add(waitingKey); + + if (size.incrementAndGet() >= maxLoadSize) { + doLoad(); + } else if (schedule == null || schedule.isDone()) { + startWaiting(); + } + + return waitingKey.future; + } + + synchronized private void startWaiting() { + schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS); + } + + synchronized private void doLoad() { + schedule.cancel(false); + do { + List toLoad = new ArrayList<>(Math.min(size.get(), maxLoadSize)); + int counter = maxLoadSize; + while (counter > 0) { + final WaitingKey waitingKey = waitingKeys.poll(); + if (waitingKey == null) + break; + else { + toLoad.add(waitingKey); + counter--; + } + } + + final int taken = maxLoadSize - counter; + if (taken > 0) { + bulkLoader.accept(toLoad); + size.updateAndGet(oldSize -> oldSize - taken); + } + + } while (size.get() >= maxLoadSize); + final WaitingKey nextWaitingKey = waitingKeys.peek(); + if (nextWaitingKey != null) { + schedule = timer.schedule(this::doLoad, nextWaitingKey.waitingSince + maxDelay - System.currentTimeMillis(), MILLISECONDS); + } + } + +} diff --git a/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java new file mode 100644 index 0000000000..8d00f5de81 --- /dev/null +++ b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java @@ -0,0 +1,164 @@ +/* + * Copyright 2019 Guus C. Bloemsma. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.examples.coalescing.bulkloader; + +import static com.github.benmanes.caffeine.examples.coalescing.bulkloader.CoalescingBulkloader.byExtraction; +import static com.github.benmanes.caffeine.examples.coalescing.bulkloader.CoalescingBulkloader.byMap; +import static com.github.benmanes.caffeine.examples.coalescing.bulkloader.CoalescingBulkloader.byOrder; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; +import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Stream; +import org.awaitility.Awaitility; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.junit.runners.model.Statement; + +@RunWith(Parameterized.class) +public final class CoalescingBulkloaderTest { + + private final Function, Stream>, CoalescingBulkloader> cbl; + + public CoalescingBulkloaderTest(Function, Stream>, CoalescingBulkloader> cbl) { + this.cbl = cbl; + } + + private final static int maxLoadSize = 10; + private final static int maxDelay = 50; + private final static int delta = 5; + private final static int actualLoadTime = 20; + + @Parameters + public static List, Stream>, CoalescingBulkloader>> loaderTypes() { + return Arrays.asList( + fun -> byOrder(maxLoadSize, maxDelay, + ints -> CompletableFuture.supplyAsync(() -> fun.apply(ints))), + fun -> byMap(maxLoadSize, maxDelay, + ints -> CompletableFuture.supplyAsync(() -> fun.apply(ints).collect(toMap(identity(), identity())))), + fun -> byExtraction(maxLoadSize, maxDelay, identity(), identity(), + ints -> CompletableFuture.supplyAsync(() -> fun.apply(ints))) + ); + } + + @NonNull private AsyncLoadingCache createCache(AtomicInteger loaderCalled) { + return Caffeine.newBuilder().buildAsync(cbl.apply(ints -> { + loaderCalled.incrementAndGet(); + try { + Thread.sleep(actualLoadTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return ints; + })); + } + + @Test + public void maxDelayIsNotMissedTooMuch() throws InterruptedException { + AtomicInteger loaderCalled = new AtomicInteger(0); + final AsyncLoadingCache cache = createCache(loaderCalled); + + // a cache get won't take too long + final CompletableFuture result = cache.get(1); + Awaitility.await().pollThread(Thread::new).pollInterval(1, MILLISECONDS) + .between(maxDelay - delta, MILLISECONDS, maxDelay + delta, MILLISECONDS) + .untilAtomic(loaderCalled, is(1)); + assertFalse("delay in load", result.isDone()); + Thread.sleep(actualLoadTime); + assertThat(result.getNow(0), is(1)); + } + + @Test + public void whenEnoughKeysAreRequestedTheLoadWillHappenImmediately() throws InterruptedException { + AtomicInteger loaderCalled = new AtomicInteger(0); + final AsyncLoadingCache cache = createCache(loaderCalled); + + CompletableFuture[] results = new CompletableFuture[maxLoadSize]; + for (int i = 0; i < maxLoadSize - 1; i++) + results[i] = cache.get(i); + Thread.sleep(delta); + // requesting 9 keys does not trigger a load + assertThat(loaderCalled.get(), is(0)); + + for (int i = 0; i < maxLoadSize - 1; i++) { + final CompletableFuture result = cache.get(i); + assertThat(result, sameInstance(results[i])); + assertFalse("no load therefore unknown result", result.isDone()); + } + Thread.sleep(delta); + // requesting the same 9 keys still doesn't trigger a load + assertThat(loaderCalled.get(), is(0)); + + // requesting one more key will trigger immediately + results[maxLoadSize - 1] = cache.get(maxLoadSize - 1); + Awaitility.await().pollInterval(1, MILLISECONDS) + .atMost(delta, MILLISECONDS) + .untilAtomic(loaderCalled, is(Integer.valueOf(1))); + + // values are not immediately available because of the sleep in the loader + for (int i = 0; i < maxLoadSize; i++) { + assertThat(results[i].getNow(-1), is(-1)); + } + Thread.sleep(actualLoadTime + delta); + // slept enough + for (int i = 0; i < maxLoadSize; i++) { + assertThat(results[i].getNow(-1), is(i)); + } + + } + + @Rule + // Because the jvm may have to warm up or whatever other influences, timing may be off, causing these tests to fail. + // So retry a couple of times. + public TestRule retry = (final Statement base, final Description description) -> new Statement() { + + @Override + public void evaluate() throws Throwable { + try_(3); + } + + void try_(int tries) throws Throwable { + try { + base.evaluate(); + } catch (Throwable throwable) { + System.err.println(description.getDisplayName() + " failed, " + (tries - 1) + " attempts left."); + if (tries > 1) + try_(tries - 1); + else { + throw throwable; + } + } + } + }; + +} diff --git a/travis.sh b/travis.sh index 9ce4c68548..0f65e591ee 100755 --- a/travis.sh +++ b/travis.sh @@ -24,6 +24,7 @@ case "${1:?''}" in run "./gradlew spotbugsJavaPoet spotbugsMain spotbugsJmh -Dspotbugs --console plain" run "sh -c 'cd examples/stats-metrics && ./gradlew test --console plain --no-daemon'" run "sh -c 'cd examples/write-behind-rxjava && mvn test'" + run "sh -c 'cd examples/coalescing-bulkloader && mvn test'" ;; tests) run "./gradlew check --console plain"