Skip to content

Commit

Permalink
Merge #3669 into 3.6.2
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Dec 15, 2023
2 parents 0b8f69f + 3360492 commit 4e6f40e
Showing 1 changed file with 91 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package reactor.core.observability.micrometer;

import java.time.Duration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -331,67 +331,111 @@ void workerSchedulePeriodicallyIsCorrectlyMetered() throws InterruptedException
}

@Test
void pendingTaskRemovedOnScheduleRejection() {
CountDownLatch cdl = new CountDownLatch(1);
ExecutorService executorService = new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
void pendingTaskRemovedOnScheduleRejection() throws InterruptedException {
CountDownLatch activeTaskLatch = new CountDownLatch(1);
CountDownLatch pendingTaskLatch = new CountDownLatch(1);
CountDownLatch countPendingLatch = new CountDownLatch(1);
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1));
Scheduler original = Schedulers.fromExecutorService(executorService);
TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty());
testScheduler.init();

RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending");
LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();

Runnable supp = () -> {
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
try {
Runnable activeTask = () -> {
try {
countPendingLatch.countDown();
activeTaskLatch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

Runnable pendingTask = pendingTaskLatch::countDown;
Runnable rejectedTask = () -> {};

// Schedule two tasks: one will execute, the other will wait in the queue
assertThatNoException().isThrownBy(() -> testScheduler.schedule(activeTask));
assertThatNoException().isThrownBy(() -> testScheduler.schedule(pendingTask));

assertThatNoException().isThrownBy(() -> testScheduler.schedule(supp));
assertThatNoException().isThrownBy(() -> testScheduler.schedule(supp));
assertThat(longTaskTimer.activeTasks()).as("longTaskTimer.activeTasks()").isOne();
assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(() -> testScheduler.schedule(supp));
assertThatExceptionOfType(RejectedExecutionException.class)
.isThrownBy(() -> testScheduler.schedule(supp, 0, TimeUnit.SECONDS));
// Wait till first one is picked up -> exactly one is pending now
countPendingLatch.await(1, TimeUnit.SECONDS);
assertThat(longTaskTimer.activeTasks()).as("active pending")
.isOne();

cdl.countDown();
assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(
() -> testScheduler.schedule(rejectedTask));
assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(
() -> testScheduler.schedule(rejectedTask, 0, TimeUnit.SECONDS));

assertThat(longTaskTimer.activeTasks())
.as("longTaskTimer.activeTasks()")
.isZero();
activeTaskLatch.countDown();
pendingTaskLatch.await(1, TimeUnit.SECONDS);

assertThat(longTaskTimer.activeTasks()).as("active pending")
.isZero();
} finally {
testScheduler.disposeGracefully().block(Duration.ofSeconds(1));
}
}

@Test
void workerPendingTaskRemovedOnScheduleRejection() {
CountDownLatch cdl = new CountDownLatch(1);
ExecutorService executorService = new ThreadPoolExecutor(1, 2, 0L, TimeUnit.MILLISECONDS,
new SynchronousQueue<>());
void workerPendingTaskRemovedOnScheduleRejection() throws InterruptedException {
CountDownLatch activeTaskLatch = new CountDownLatch(1);
CountDownLatch pendingTaskLatch = new CountDownLatch(1);
CountDownLatch countPendingLatch = new CountDownLatch(1);
ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1));
Scheduler original = Schedulers.fromExecutorService(executorService);
TimedScheduler testScheduler = new TimedScheduler(original, registry, "test", Tags.empty());
RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending");
LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();
testScheduler.init();
Scheduler.Worker worker = testScheduler.createWorker();

Runnable supp = () -> {
try {
cdl.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

assertThatNoException().isThrownBy(() -> worker.schedule(supp));
assertThatNoException().isThrownBy(() -> worker.schedule(supp));
assertThat(longTaskTimer.activeTasks()).as("longTaskTimer.activeTasks()").isOne();
assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(() -> worker.schedule(supp));
assertThatExceptionOfType(RejectedExecutionException.class)
.isThrownBy(() -> worker.schedule(supp, 0, TimeUnit.SECONDS));

cdl.countDown();
RequiredSearch requiredSearch = registry.get("test.scheduler.tasks.pending");
LongTaskTimer longTaskTimer = requiredSearch.longTaskTimer();

assertThat(longTaskTimer.activeTasks())
.as("longTaskTimer.activeTasks()")
.isZero();
try {
Runnable activeTask = () -> {
try {
countPendingLatch.countDown();
activeTaskLatch.await();
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
};

Runnable pendingTask = pendingTaskLatch::countDown;
Runnable rejectedTask = () -> {
};

// Schedule two tasks: one will execute, the other will wait in the queue
assertThatNoException().isThrownBy(() -> worker.schedule(activeTask));
assertThatNoException().isThrownBy(() -> worker.schedule(pendingTask));

// Wait till first one is picked up -> exactly one is pending now
countPendingLatch.await(1, TimeUnit.SECONDS);
assertThat(longTaskTimer.activeTasks()).as("active pending")
.isOne();

assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(
() -> worker.schedule(rejectedTask));
assertThatExceptionOfType(RejectedExecutionException.class).isThrownBy(
() -> worker.schedule(rejectedTask, 0, TimeUnit.SECONDS));

activeTaskLatch.countDown();
pendingTaskLatch.await(1, TimeUnit.SECONDS);

assertThat(longTaskTimer.activeTasks()).as("active pending")
.isZero();
}
finally {
worker.dispose();
testScheduler.disposeGracefully()
.block(Duration.ofSeconds(1));
}
}
}

0 comments on commit 4e6f40e

Please sign in to comment.