From 891ad8bf33d468896f8b89632a58575fee8da82f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Mon, 27 Nov 2023 10:32:12 +0100 Subject: [PATCH] Fix TestEvictableCache.testLoadFailure flakiness The test relied on sleep for synchronization. --- .../java/io/trino/cache/TestEmptyCache.java | 25 +++++++++++++------ .../io/trino/cache/TestEvictableCache.java | 23 +++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/lib/trino-cache/src/test/java/io/trino/cache/TestEmptyCache.java b/lib/trino-cache/src/test/java/io/trino/cache/TestEmptyCache.java index 0572a9e3ab6e..4e26d40ae36d 100644 --- a/lib/trino-cache/src/test/java/io/trino/cache/TestEmptyCache.java +++ b/lib/trino-cache/src/test/java/io/trino/cache/TestEmptyCache.java @@ -20,12 +20,13 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import static io.trino.testing.assertions.Assert.assertEventually; import static java.util.concurrent.Executors.newFixedThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.assertThat; @@ -48,17 +49,25 @@ public void testLoadFailure() ExecutorService executor = newFixedThreadPool(2); try { - AtomicBoolean first = new AtomicBoolean(true); - CyclicBarrier barrier = new CyclicBarrier(2); + Exchanger exchanger = new Exchanger<>(); + CountDownLatch secondUnblocked = new CountDownLatch(1); List> futures = new ArrayList<>(); for (int i = 0; i < 2; i++) { + boolean first = i == 0; futures.add(executor.submit(() -> { - barrier.await(10, SECONDS); + if (!first) { + // Wait for the first one to start the call + exchanger.exchange(Thread.currentThread(), 10, SECONDS); + // Prove that we are back in RUNNABLE state. + secondUnblocked.countDown(); + } return cache.get(key, () -> { - if (first.compareAndSet(true, false)) { - // first - Thread.sleep(1); // increase chances that second thread calls cache.get before we return + if (first) { + Thread secondThread = exchanger.exchange(null, 10, SECONDS); + assertThat(secondUnblocked.await(10, SECONDS)).isTrue(); + // Wait for the second one to hang inside the cache.get call. + assertEventually(() -> assertThat(secondThread.getState()).isNotEqualTo(Thread.State.RUNNABLE)); throw new RuntimeException("first attempt is poised to fail"); } return "success"; diff --git a/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java b/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java index 5fd8d27e2ba5..0c589503bea7 100644 --- a/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java +++ b/lib/trino-cache/src/test/java/io/trino/cache/TestEvictableCache.java @@ -31,11 +31,11 @@ import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @@ -44,6 +44,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.cache.CacheStatsAssertions.assertCacheStats; +import static io.trino.testing.assertions.Assert.assertEventually; import static java.lang.Math.toIntExact; import static java.lang.String.format; import static java.util.concurrent.Executors.newFixedThreadPool; @@ -300,17 +301,25 @@ public void testLoadFailure() ExecutorService executor = newFixedThreadPool(2); try { - AtomicBoolean first = new AtomicBoolean(true); - CyclicBarrier barrier = new CyclicBarrier(2); + Exchanger exchanger = new Exchanger<>(); + CountDownLatch secondUnblocked = new CountDownLatch(1); List> futures = new ArrayList<>(); for (int i = 0; i < 2; i++) { + boolean first = i == 0; futures.add(executor.submit(() -> { - barrier.await(10, SECONDS); + if (!first) { + // Wait for the first one to start the call + exchanger.exchange(Thread.currentThread(), 10, SECONDS); + // Prove that we are back in RUNNABLE state. + secondUnblocked.countDown(); + } return cache.get(key, () -> { - if (first.compareAndSet(true, false)) { - // first - Thread.sleep(1); // increase chances that second thread calls cache.get before we return + if (first) { + Thread secondThread = exchanger.exchange(null, 10, SECONDS); + assertThat(secondUnblocked.await(10, SECONDS)).isTrue(); + // Wait for the second one to hang inside the cache.get call. + assertEventually(() -> assertThat(secondThread.getState()).isNotEqualTo(Thread.State.RUNNABLE)); throw new RuntimeException("first attempt is poised to fail"); } return "success";