diff --git a/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java b/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java index b8242e68ac..b0b16fe8fb 100644 --- a/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java +++ b/reactor-core-micrometer/src/test/java/reactor/core/observability/micrometer/TimedSchedulerTest.java @@ -17,10 +17,10 @@ package reactor.core.observability.micrometer; import java.time.Duration; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -331,67 +331,111 @@ void workerSchedulePeriodicallyIsCorrectlyMetered() throws InterruptedException } @Test - void pendingTaskRemovedOnScheduleRejection() { - CountDownLatch cdl = new CountDownLatch(1); - ExecutorService executorService = new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS, - new SynchronousQueue<>()); + void pendingTaskRemovedOnScheduleRejection() throws InterruptedException { + CountDownLatch activeTaskLatch = new CountDownLatch(1); + CountDownLatch pendingTaskLatch = new CountDownLatch(1); + CountDownLatch countPendingLatch = new CountDownLatch(1); + ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1)); Scheduler original = Schedulers.fromExecutorService(executorService); TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty()); + testScheduler.init(); + RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending"); LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer(); - Runnable supp = () -> { - try { - cdl.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }; + try { + Runnable activeTask = () -> { + try { + countPendingLatch.countDown(); + activeTaskLatch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + Runnable pendingTask = pendingTaskLatch::countDown; + Runnable rejectedTask = () -> {}; + + // Schedule two tasks: one will execute, the other will wait in the queue + assertThatNoException().isThrownBy(() -> testScheduler.schedule(activeTask)); + assertThatNoException().isThrownBy(() -> testScheduler.schedule(pendingTask)); - assertThatNoException().isThrownBy(() -> testScheduler.schedule(supp)); - assertThatNoException().isThrownBy(() -> testScheduler.schedule(supp)); - assertThat(longTaskTimer.activeTasks()).as("longTaskTimer.activeTasks()").isOne(); - assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(() -> testScheduler.schedule(supp)); - assertThatExceptionOfType(RejectedExecutionException.class) - .isThrownBy(() -> testScheduler.schedule(supp, 0, TimeUnit.SECONDS)); + // Wait till first one is picked up -> exactly one is pending now + countPendingLatch.await(1, TimeUnit.SECONDS); + assertThat(longTaskTimer.activeTasks()).as("active pending") + .isOne(); - cdl.countDown(); + assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy( + () -> testScheduler.schedule(rejectedTask)); + assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy( + () -> testScheduler.schedule(rejectedTask, 0, TimeUnit.SECONDS)); - assertThat(longTaskTimer.activeTasks()) - .as("longTaskTimer.activeTasks()") - .isZero(); + activeTaskLatch.countDown(); + pendingTaskLatch.await(1, TimeUnit.SECONDS); + + assertThat(longTaskTimer.activeTasks()).as("active pending") + .isZero(); + } finally { + testScheduler.disposeGracefully().block(Duration.ofSeconds(1)); + } } @Test - void workerPendingTaskRemovedOnScheduleRejection() { - CountDownLatch cdl = new CountDownLatch(1); - ExecutorService executorService = new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS, - new SynchronousQueue<>()); + void workerPendingTaskRemovedOnScheduleRejection() throws InterruptedException { + CountDownLatch activeTaskLatch = new CountDownLatch(1); + CountDownLatch pendingTaskLatch = new CountDownLatch(1); + CountDownLatch countPendingLatch = new CountDownLatch(1); + ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(1)); Scheduler original = Schedulers.fromExecutorService(executorService); TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty()); - RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending"); - LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer(); + testScheduler.init(); Scheduler.Worker worker = testScheduler.createWorker(); - Runnable supp = () -> { - try { - cdl.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }; - - assertThatNoException().isThrownBy(() -> worker.schedule(supp)); - assertThatNoException().isThrownBy(() -> worker.schedule(supp)); - assertThat(longTaskTimer.activeTasks()).as("longTaskTimer.activeTasks()").isOne(); - assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(() -> worker.schedule(supp)); - assertThatExceptionOfType(RejectedExecutionException.class) - .isThrownBy(() -> worker.schedule(supp, 0, TimeUnit.SECONDS)); - - cdl.countDown(); + RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending"); + LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer(); - assertThat(longTaskTimer.activeTasks()) - .as("longTaskTimer.activeTasks()") - .isZero(); + try { + Runnable activeTask = () -> { + try { + countPendingLatch.countDown(); + activeTaskLatch.await(); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + + Runnable pendingTask = pendingTaskLatch::countDown; + Runnable rejectedTask = () -> { + }; + + // Schedule two tasks: one will execute, the other will wait in the queue + assertThatNoException().isThrownBy(() -> worker.schedule(activeTask)); + assertThatNoException().isThrownBy(() -> worker.schedule(pendingTask)); + + // Wait till first one is picked up -> exactly one is pending now + countPendingLatch.await(1, TimeUnit.SECONDS); + assertThat(longTaskTimer.activeTasks()).as("active pending") + .isOne(); + + assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy( + () -> worker.schedule(rejectedTask)); + assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy( + () -> worker.schedule(rejectedTask, 0, TimeUnit.SECONDS)); + + activeTaskLatch.countDown(); + pendingTaskLatch.await(1, TimeUnit.SECONDS); + + assertThat(longTaskTimer.activeTasks()).as("active pending") + .isZero(); + } + finally { + worker.dispose(); + testScheduler.disposeGracefully() + .block(Duration.ofSeconds(1)); + } } } \ No newline at end of file