From d35e86e67f6e462a55e2fb461dd74c7930448b68 Mon Sep 17 00:00:00 2001 From: "Guus C. Bloemsma" Date: Wed, 8 May 2019 16:12:16 +0200 Subject: [PATCH 1/4] Initial commit --- examples/coalescing-bulkloader/pom.xml | 47 ++++++ .../bulkloader/CoalescingBulkloader.java | 119 +++++++++++++++ .../bulkloader/CoalescingBulkloaderTest.java | 136 ++++++++++++++++++ 3 files changed, 302 insertions(+) create mode 100644 examples/coalescing-bulkloader/pom.xml create mode 100644 examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java create mode 100644 examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java diff --git a/examples/coalescing-bulkloader/pom.xml b/examples/coalescing-bulkloader/pom.xml new file mode 100644 index 0000000000..3b118e638d --- /dev/null +++ b/examples/coalescing-bulkloader/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + org.github.benmanes.caffeine.examples + coalescing-bulkloader + 1.0-SNAPSHOT + + + 1.8 + 2.7.0 + 4.12 + 3.1.6 + + + + + com.github.ben-manes.caffeine + caffeine + ${caffeine.version} + + + junit + junit + ${junit.version} + + + org.awaitility + awaitility + ${awaitility.version} + + + + + + + maven-compiler-plugin + + ${java.version} + ${java.version} + + + + + diff --git a/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java new file mode 100644 index 0000000000..26bf32a64f --- /dev/null +++ b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java @@ -0,0 +1,119 @@ +/* + * Copyright 2016 Wim Deblauwe. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.examples.coalescing.bulkloader; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicInteger; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * An implementation of {@link AsyncCacheLoader} that delays fetching a bit until "enough" keys are collected + * to do a bulk call. The assumption is that doing a bulk call is so much more efficient that it is worth + * the wait. + * + * @param the type of the key in the cache + * @param the type of the value in the cache + * @author complain to: guus@bloemsma.net + */ +public abstract class CoalescingBulkloader implements AsyncCacheLoader { + private int maxLoadSize; // maximum number of keys to load in one call + private long maxDelay; // maximum time between request of a value and loading it + private volatile Queue waitingKeys = new ConcurrentLinkedQueue<>(); + private ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + private ScheduledFuture schedule; + // Queue.size() is expensive, so here we keep track of the queue size separately + private AtomicInteger size = new AtomicInteger(0); + + private final class WaitingKey { + Key key; + CompletableFuture value; + long waitingSince; + } + + public CoalescingBulkloader(int maxLoadSize, long maxDelay) { + assert maxLoadSize > 0; + assert maxDelay > 0; + this.maxLoadSize = maxLoadSize; + this.maxDelay = maxDelay; + } + + @Override public @NonNull CompletableFuture asyncLoad(@NonNull Key key, @NonNull Executor executor) { + if (schedule == null || schedule.isDone()) { + startWaiting(); + } + + final WaitingKey waitingKey = new WaitingKey(); + waitingKey.key = key; + waitingKey.value = new CompletableFuture<>(); + waitingKey.waitingSince = System.currentTimeMillis(); + waitingKeys.add(waitingKey); + + if (size.incrementAndGet() >= maxLoadSize) { + doLoad(); + } + return waitingKey.value; + } + + synchronized private void startWaiting() { + schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS); + } + + synchronized private void doLoad() { + schedule.cancel(false); + do { + Map> toLoad = new HashMap<>(Math.min(size.get(), maxLoadSize)); + int counter = maxLoadSize; + while (counter > 0) { + final WaitingKey waitingKey = waitingKeys.poll(); + if (waitingKey == null) + break; + else { + toLoad.put(waitingKey.key, waitingKey.value); + counter--; + } + } + + final int taken = maxLoadSize - counter; + if (taken > 0) { + load(toLoad); + size.updateAndGet(oldSize -> oldSize - taken); + } + + } while (size.get() >= maxLoadSize); + final WaitingKey nextWaitingKey = waitingKeys.peek(); + if (nextWaitingKey != null) { + schedule = timer.schedule(this::doLoad, nextWaitingKey.waitingSince + maxDelay - System.currentTimeMillis(), MILLISECONDS); + } + } + + /** + * Must be implemented by user. It is expected to asynchronously load all keys and complete the corresponding futures + * with the correct values. When done each future must be completed with the correct value. + */ + protected abstract void load(Map> toLoad); + +} diff --git a/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java new file mode 100644 index 0000000000..798bbd8780 --- /dev/null +++ b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java @@ -0,0 +1,136 @@ +/* + * Copyright 2019 Guus C. Bloemsma. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.examples.coalescing.bulkloader; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +import com.github.benmanes.caffeine.cache.AsyncLoadingCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; +import org.awaitility.Awaitility; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +public final class CoalescingBulkloaderTest { + + @NonNull private AsyncLoadingCache createCache(AtomicInteger loaderCalled) { + return Caffeine.newBuilder().buildAsync( + new CoalescingBulkloader(10, 50) { + @Override protected void load(Map> toLoad) { + ForkJoinPool.commonPool().execute(() -> { + loaderCalled.incrementAndGet(); + try { + Thread.sleep(20); + toLoad.forEach((k, v) -> v.complete(k)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + } + }); + } + + @Test + public void maxDelayIsNotMissedTooMuch() throws ExecutionException, InterruptedException { + AtomicInteger loaderCalled = new AtomicInteger(0); + final AsyncLoadingCache cache = createCache(loaderCalled); + + // a cache get won't take too long + final CompletableFuture result = cache.get(1); + Awaitility.await().pollThread(Thread::new).pollInterval(1, MILLISECONDS) + .between(45, MILLISECONDS, 55, MILLISECONDS) + .untilAtomic(loaderCalled, is(1)); + assertTrue("delay in load", !result.isDone()); + Thread.sleep(20); + assertThat(result.getNow(0), is(1)); + } + + @Test + public void whenEnoughKeysAreRequestedTheLoadWillHappenImmediately() throws InterruptedException { + AtomicInteger loaderCalled = new AtomicInteger(0); + final AsyncLoadingCache cache = createCache(loaderCalled); + + CompletableFuture[] results = new CompletableFuture[10]; + for (int i = 0; i < 9; i++) + results[i] = cache.get(i); + Thread.sleep(5); + // requesting 9 keys does not trigger a load + assertThat(loaderCalled.get(), is(0)); + + for (int i = 0; i < 9; i++) { + final CompletableFuture result = cache.get(i); + assertThat(result, sameInstance(results[i])); + assertTrue("no load therefore unknown result", !result.isDone()); + } + Thread.sleep(5); + // requesting the same 9 keys still doesn't trigger a load + assertThat(loaderCalled.get(), is(0)); + + // requesting the 10 key will trigger immediately + results[9] = cache.get(9); + Awaitility.await().pollInterval(1, MILLISECONDS) + .atMost(15, MILLISECONDS) + .untilAtomic(loaderCalled, is(Integer.valueOf(1))); + + // values are not immediately available because of the sleep in the loader + for (int i = 0; i < 10; i++) { + assertThat(results[i].getNow(-1), is(-1)); + } + Thread.sleep(20); + // slept enough + for (int i = 0; i < 10; i++) { + assertThat(results[i].getNow(-1), is(i)); + } + + } + + @Rule + // Because the jvm may have to warm up or whatever other influences, timing may be off, causing these tests to fail. + // So retry a couple of times. + public TestRule retry = (final Statement base, final Description description) -> new Statement() { + + @Override + public void evaluate() throws Throwable { + trie(3); + } + + void trie(int tries) throws Throwable { + try { + base.evaluate(); + } catch (Throwable throwable) { + System.err.println(description.getDisplayName() + " failed, " + (tries - 1) + " attempts left."); + if(tries>1) + trie(tries-1); + else { + throw throwable; + } + } + } + }; + +} From 30ad8e9e22d2bca5b55daf26ed6ddd05ec736737 Mon Sep 17 00:00:00 2001 From: "Guus C. Bloemsma" Date: Sat, 11 May 2019 17:03:03 +0200 Subject: [PATCH 2/4] better interface for different usecases --- .../bulkloader/CoalescingBulkloader.java | 111 +++++++++++++--- .../bulkloader/CoalescingBulkloaderTest.java | 120 +++++++++++------- 2 files changed, 166 insertions(+), 65 deletions(-) diff --git a/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java index 26bf32a64f..27433101df 100644 --- a/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java +++ b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java @@ -16,10 +16,15 @@ package com.github.benmanes.caffeine.examples.coalescing.bulkloader; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.stream.Collectors.toMap; import com.github.benmanes.caffeine.cache.AsyncCacheLoader; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -28,6 +33,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -39,7 +47,8 @@ * @param the type of the value in the cache * @author complain to: guus@bloemsma.net */ -public abstract class CoalescingBulkloader implements AsyncCacheLoader { +public class CoalescingBulkloader implements AsyncCacheLoader { + private final Consumer> bulkLoader; private int maxLoadSize; // maximum number of keys to load in one call private long maxDelay; // maximum time between request of a value and loading it private volatile Queue waitingKeys = new ConcurrentLinkedQueue<>(); @@ -50,11 +59,82 @@ public abstract class CoalescingBulkloader implements AsyncCacheLoad private final class WaitingKey { Key key; - CompletableFuture value; + CompletableFuture future; long waitingSince; } - public CoalescingBulkloader(int maxLoadSize, long maxDelay) { + /** + * Wraps a bulk loader that returns values in the same order as the keys. + * + * @param maxLoadSize Maximum number of keys per bulk load + * @param maxDelay Maximum time to wait before bulk load is executed + * @param load Loader that takes keys and returns a future with the values in the same order as the keys. + * Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException} + * for the corresponding future. + */ + public static CoalescingBulkloader byOrder(int maxLoadSize, long maxDelay, + final Function, CompletableFuture>> load) { + return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> { + final Stream keys = toLoad.stream().map(wk -> wk.key); + load.apply(keys).thenAccept(values -> { + final Iterator iv = values.iterator(); + for (CoalescingBulkloader.WaitingKey waitingKey : toLoad) { + if (iv.hasNext()) + waitingKey.future.complete(iv.next()); + else + waitingKey.future.completeExceptionally(new NoSuchElementException("No value for key " + waitingKey.key)); + } + }); + }); + } + + /** + * Wraps a bulk loader that returns values in a map accessed by key. + * + * @param maxLoadSize Maximum number of keys per bulk load + * @param maxDelay Maximum time to wait before bulk load is executed + * @param load Loader that takes keys and returns a future with a map with keys and values. + * Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException} + * for the corresponding future. + */ + public static CoalescingBulkloader byMap(int maxLoadSize, long maxDelay, + final Function, CompletableFuture>> load) { + return new CoalescingBulkloader<>(maxLoadSize, maxDelay, toLoad -> { + final Stream keys = toLoad.stream().map(wk -> wk.key); + load.apply(keys).thenAccept(values -> { + for (CoalescingBulkloader.WaitingKey waitingKey : toLoad) { + if (values.containsKey(waitingKey.key)) + waitingKey.future.complete(values.get(waitingKey.key)); + else + waitingKey.future.completeExceptionally(new NoSuchElementException("No value for key " + waitingKey.key)); + } + }); + }); + } + + /** + * Wraps a bulk loader that returns intermediate values from which keys and values can be extracted. + * + * @param Some internal type from which keys and values can be extracted. + * @param maxLoadSize Maximum number of keys per bulk load + * @param maxDelay Maximum time to wait before bulk load is executed + * @param keyExtractor How to extract key from intermediate value + * @param valueExtractor How to extract value from intermediate value + * @param load Loader that takes keys and returns a future with a map with keys and values. + * Extra values are ignored. Missing values lead to a {@link java.util.NoSuchElementException} + * for the corresponding future. + */ + public static CoalescingBulkloader byExtraction(int maxLoadSize, long maxDelay, + final Function keyExtractor, + final Function valueExtractor, + final Function, CompletableFuture>> load) { + return byMap(maxLoadSize, maxDelay, + keys -> load.apply(keys).thenApply( + intermediates -> intermediates.collect(toMap(keyExtractor, valueExtractor)))); + } + + private CoalescingBulkloader(int maxLoadSize, long maxDelay, Consumer> bulkLoader) { + this.bulkLoader = bulkLoader; assert maxLoadSize > 0; assert maxDelay > 0; this.maxLoadSize = maxLoadSize; @@ -62,20 +142,19 @@ public CoalescingBulkloader(int maxLoadSize, long maxDelay) { } @Override public @NonNull CompletableFuture asyncLoad(@NonNull Key key, @NonNull Executor executor) { - if (schedule == null || schedule.isDone()) { - startWaiting(); - } - final WaitingKey waitingKey = new WaitingKey(); waitingKey.key = key; - waitingKey.value = new CompletableFuture<>(); + waitingKey.future = new CompletableFuture<>(); waitingKey.waitingSince = System.currentTimeMillis(); waitingKeys.add(waitingKey); if (size.incrementAndGet() >= maxLoadSize) { doLoad(); + } else if (schedule == null || schedule.isDone()) { + startWaiting(); } - return waitingKey.value; + + return waitingKey.future; } synchronized private void startWaiting() { @@ -85,21 +164,21 @@ synchronized private void startWaiting() { synchronized private void doLoad() { schedule.cancel(false); do { - Map> toLoad = new HashMap<>(Math.min(size.get(), maxLoadSize)); + List toLoad = new ArrayList<>(Math.min(size.get(), maxLoadSize)); int counter = maxLoadSize; while (counter > 0) { final WaitingKey waitingKey = waitingKeys.poll(); if (waitingKey == null) break; else { - toLoad.put(waitingKey.key, waitingKey.value); + toLoad.add(waitingKey); counter--; } } final int taken = maxLoadSize - counter; if (taken > 0) { - load(toLoad); + bulkLoader.accept(toLoad); size.updateAndGet(oldSize -> oldSize - taken); } @@ -110,10 +189,4 @@ synchronized private void doLoad() { } } - /** - * Must be implemented by user. It is expected to asynchronously load all keys and complete the corresponding futures - * with the correct values. When done each future must be completed with the correct value. - */ - protected abstract void load(Map> toLoad); - } diff --git a/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java index 798bbd8780..fd349476e3 100644 --- a/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java +++ b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 Guus C. Bloemsma. All Rights Reserved. + * Copyright actualLoadTime19 Guus C. Bloemsma. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,58 +15,86 @@ */ package com.github.benmanes.caffeine.examples.coalescing.bulkloader; +import static com.github.benmanes.caffeine.examples.coalescing.bulkloader.CoalescingBulkloader.byExtraction; +import static com.github.benmanes.caffeine.examples.coalescing.bulkloader.CoalescingBulkloader.byMap; +import static com.github.benmanes.caffeine.examples.coalescing.bulkloader.CoalescingBulkloader.byOrder; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; import static org.hamcrest.Matchers.sameInstance; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import com.github.benmanes.caffeine.cache.AsyncLoadingCache; import com.github.benmanes.caffeine.cache.Caffeine; -import java.util.Map; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Stream; import org.awaitility.Awaitility; import org.checkerframework.checker.nullness.qual.NonNull; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; import org.junit.runner.Description; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.junit.runners.model.Statement; +@RunWith(Parameterized.class) public final class CoalescingBulkloaderTest { + private final Function, Stream>, CoalescingBulkloader> cbl; + + public CoalescingBulkloaderTest(Function, Stream>, CoalescingBulkloader> cbl) { + this.cbl = cbl; + } + + private final static int maxLoadSize = 10; + private final static int maxDelay = 50; + private final static int delta = 5; + private final static int actualLoadTime = 20; + + @Parameters + public static List, Stream>, CoalescingBulkloader>> loaderTypes() { + return Arrays.asList( + fun -> byOrder(maxLoadSize, maxDelay, + ints -> CompletableFuture.supplyAsync(() -> fun.apply(ints))), + fun -> byMap(maxLoadSize, maxDelay, + ints -> CompletableFuture.supplyAsync(() -> fun.apply(ints).collect(toMap(identity(), identity())))), + fun -> byExtraction(maxLoadSize, maxDelay, identity(), identity(), + ints -> CompletableFuture.supplyAsync(() -> fun.apply(ints))) + ); + } + @NonNull private AsyncLoadingCache createCache(AtomicInteger loaderCalled) { - return Caffeine.newBuilder().buildAsync( - new CoalescingBulkloader(10, 50) { - @Override protected void load(Map> toLoad) { - ForkJoinPool.commonPool().execute(() -> { - loaderCalled.incrementAndGet(); - try { - Thread.sleep(20); - toLoad.forEach((k, v) -> v.complete(k)); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - } - }); + return Caffeine.newBuilder().buildAsync(cbl.apply(ints -> { + loaderCalled.incrementAndGet(); + try { + Thread.sleep(actualLoadTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return ints; + })); } @Test - public void maxDelayIsNotMissedTooMuch() throws ExecutionException, InterruptedException { + public void maxDelayIsNotMissedTooMuch() throws InterruptedException { AtomicInteger loaderCalled = new AtomicInteger(0); final AsyncLoadingCache cache = createCache(loaderCalled); // a cache get won't take too long final CompletableFuture result = cache.get(1); Awaitility.await().pollThread(Thread::new).pollInterval(1, MILLISECONDS) - .between(45, MILLISECONDS, 55, MILLISECONDS) + .between(maxDelay - delta, MILLISECONDS, maxDelay + delta, MILLISECONDS) .untilAtomic(loaderCalled, is(1)); - assertTrue("delay in load", !result.isDone()); - Thread.sleep(20); + assertFalse("delay in load", result.isDone()); + Thread.sleep(actualLoadTime); assertThat(result.getNow(0), is(1)); } @@ -75,35 +103,35 @@ public void whenEnoughKeysAreRequestedTheLoadWillHappenImmediately() throws Inte AtomicInteger loaderCalled = new AtomicInteger(0); final AsyncLoadingCache cache = createCache(loaderCalled); - CompletableFuture[] results = new CompletableFuture[10]; - for (int i = 0; i < 9; i++) + CompletableFuture[] results = new CompletableFuture[maxLoadSize]; + for (int i = 0; i < maxLoadSize - 1; i++) results[i] = cache.get(i); - Thread.sleep(5); + Thread.sleep(delta); // requesting 9 keys does not trigger a load assertThat(loaderCalled.get(), is(0)); - for (int i = 0; i < 9; i++) { + for (int i = 0; i < maxLoadSize - 1; i++) { final CompletableFuture result = cache.get(i); assertThat(result, sameInstance(results[i])); - assertTrue("no load therefore unknown result", !result.isDone()); + assertFalse("no load therefore unknown result", result.isDone()); } - Thread.sleep(5); + Thread.sleep(delta); // requesting the same 9 keys still doesn't trigger a load assertThat(loaderCalled.get(), is(0)); - // requesting the 10 key will trigger immediately - results[9] = cache.get(9); + // requesting one more key will trigger immediately + results[maxLoadSize - 1] = cache.get(maxLoadSize - 1); Awaitility.await().pollInterval(1, MILLISECONDS) - .atMost(15, MILLISECONDS) + .atMost(delta, MILLISECONDS) .untilAtomic(loaderCalled, is(Integer.valueOf(1))); // values are not immediately available because of the sleep in the loader - for (int i = 0; i < 10; i++) { + for (int i = 0; i < maxLoadSize; i++) { assertThat(results[i].getNow(-1), is(-1)); } - Thread.sleep(20); + Thread.sleep(actualLoadTime + delta); // slept enough - for (int i = 0; i < 10; i++) { + for (int i = 0; i < maxLoadSize; i++) { assertThat(results[i].getNow(-1), is(i)); } @@ -116,20 +144,20 @@ public void whenEnoughKeysAreRequestedTheLoadWillHappenImmediately() throws Inte @Override public void evaluate() throws Throwable { - trie(3); + try_(3); } - void trie(int tries) throws Throwable { - try { - base.evaluate(); - } catch (Throwable throwable) { - System.err.println(description.getDisplayName() + " failed, " + (tries - 1) + " attempts left."); - if(tries>1) - trie(tries-1); - else { - throw throwable; - } + void try_(int tries) throws Throwable { + try { + base.evaluate(); + } catch (Throwable throwable) { + System.err.println(description.getDisplayName() + " failed, " + (tries - 1) + " attempts left."); + if (tries > 1) + try_(tries - 1); + else { + throw throwable; } + } } }; From 10dc59c29705bf43c2bd690e9a558a1e90b67cbb Mon Sep 17 00:00:00 2001 From: "Guus C. Bloemsma" Date: Tue, 2 Jul 2019 14:55:40 +0200 Subject: [PATCH 3/4] loadtime? --- .../examples/coalescing/bulkloader/CoalescingBulkloader.java | 2 +- .../coalescing/bulkloader/CoalescingBulkloaderTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java index 27433101df..5c2087e89b 100644 --- a/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java +++ b/examples/coalescing-bulkloader/src/main/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloader.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 Wim Deblauwe. All Rights Reserved. + * Copyright 2019 Guus C. Bloemsma. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java index fd349476e3..8d00f5de81 100644 --- a/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java +++ b/examples/coalescing-bulkloader/src/test/java/com/github/benmanes/caffeine/examples/coalescing/bulkloader/CoalescingBulkloaderTest.java @@ -1,5 +1,5 @@ /* - * Copyright actualLoadTime19 Guus C. Bloemsma. All Rights Reserved. + * Copyright 2019 Guus C. Bloemsma. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From d59d8dc35a98bd24970f3aa9105c5f78a63369dd Mon Sep 17 00:00:00 2001 From: "Guus C. Bloemsma" Date: Wed, 21 Aug 2019 14:31:42 +0200 Subject: [PATCH 4/4] let's see wether the test fails in travis as well --- travis.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/travis.sh b/travis.sh index f53bf2aa23..9585549824 100755 --- a/travis.sh +++ b/travis.sh @@ -23,6 +23,7 @@ case "${1:?''}" in run "./gradlew spotbugsJavaPoet spotbugsMain pmdJavaPoet pmdMain -Dspotbugs -Dpmd --console plain" run "sh -c 'cd examples/stats-metrics && ./gradlew test --console plain'" run "sh -c 'cd examples/write-behind-rxjava && mvn test'" + run "sh -c 'cd examples/coalescing-bulkloader && mvn test'" ;; tests) run "./gradlew check --console plain"