From 0e075ee6d65602e119afbbe8a30ab12ee7ab3dda Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 25 Jul 2024 11:05:50 +0200 Subject: [PATCH] Scheduler: make sure an exception never slips through an invokers chain - related to #41240 --- .../test/ConcurrentExecutionSkipTest.java | 21 +++++++++++++++++++ .../common/runtime/DefaultInvoker.java | 8 ++++--- .../common/runtime/DelegateInvoker.java | 13 ++++++++++++ .../common/runtime/InstrumentedInvoker.java | 7 +------ .../SkipConcurrentExecutionInvoker.java | 2 +- .../common/runtime/SkipPredicateInvoker.java | 2 +- .../common/runtime/StatusEmitterInvoker.java | 2 +- .../test/ConcurrentExecutionSkipTest.java | 21 +++++++++++++++++++ 8 files changed, 64 insertions(+), 12 deletions(-) diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java index 7e58c0c63a3d3..b7c25fa6435ad 100644 --- a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/ConcurrentExecutionSkipTest.java @@ -2,6 +2,7 @@ import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.util.concurrent.CountDownLatch; @@ -13,6 +14,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import io.quarkus.scheduler.FailedExecution; import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.SkippedExecution; import io.quarkus.scheduler.SuccessfulExecution; @@ -39,6 +41,10 @@ public void testExecution() { } else { fail("Jobs were not executed in 10 seconds!"); } + + assertTrue(Jobs.FAILED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.FAILURE_COUNTER.get() > 0); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); @@ -50,8 +56,11 @@ static class Jobs { static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1); static final AtomicInteger COUNTER = new AtomicInteger(0); + static final AtomicInteger FAILING_COUNTER = new AtomicInteger(0); static final AtomicInteger SUCCESS_COUNTER = new AtomicInteger(0); + static final AtomicInteger FAILURE_COUNTER = new AtomicInteger(0); static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1); + static final CountDownLatch FAILED_LATCH = new CountDownLatch(1); @Scheduled(every = "1s", concurrentExecution = SKIP) void nonconcurrent() throws InterruptedException { @@ -61,6 +70,14 @@ void nonconcurrent() throws InterruptedException { } } + @Scheduled(every = "1s", concurrentExecution = SKIP) + void failing() { + if (FAILING_COUNTER.incrementAndGet() > 2) { + FAILED_LATCH.countDown(); + } + throw new IllegalStateException(); + } + void onSkip(@Observes SkippedExecution event) { SKIPPED_LATCH.countDown(); } @@ -68,5 +85,9 @@ void onSkip(@Observes SkippedExecution event) { void onSuccess(@Observes SuccessfulExecution event) { SUCCESS_COUNTER.incrementAndGet(); } + + void onFailure(@Observes FailedExecution event) { + FAILURE_COUNTER.incrementAndGet(); + } } } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java index c587b0c056e32..cd9abbadbf656 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DefaultInvoker.java @@ -1,5 +1,6 @@ package io.quarkus.scheduler.common.runtime; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import io.quarkus.arc.Arc; @@ -24,10 +25,10 @@ public CompletionStage invoke(ScheduledExecution execution) throws Excepti return invokeBean(execution).whenComplete((v, t) -> { requestContext.destroy(state); }); - } catch (RuntimeException e) { - // Just terminate the context and rethrow the exception if something goes really wrong + } catch (Throwable e) { + // Terminate the context and return a failed stage if something goes really wrong requestContext.terminate(); - throw e; + return CompletableFuture.failedStage(e); } finally { // Always deactivate the context requestContext.deactivate(); @@ -35,6 +36,7 @@ public CompletionStage invoke(ScheduledExecution execution) throws Excepti } } + // This method is generated and should never throw an exception protected abstract CompletionStage invokeBean(ScheduledExecution execution); } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java index ff7571ab10352..a2245862d7fb0 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/DelegateInvoker.java @@ -1,5 +1,10 @@ package io.quarkus.scheduler.common.runtime; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import io.quarkus.scheduler.ScheduledExecution; + abstract class DelegateInvoker implements ScheduledInvoker { protected final ScheduledInvoker delegate; @@ -17,4 +22,12 @@ public boolean isBlocking() { public boolean isRunningOnVirtualThread() { return delegate.isRunningOnVirtualThread(); } + + protected CompletionStage invokeDelegate(ScheduledExecution execution) { + try { + return delegate.invoke(execution); + } catch (Throwable e) { + return CompletableFuture.failedStage(e); + } + } } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java index c0e330001f436..5cb2721bd6435 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/InstrumentedInvoker.java @@ -1,6 +1,5 @@ package io.quarkus.scheduler.common.runtime; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import io.quarkus.scheduler.ScheduledExecution; @@ -26,11 +25,7 @@ public CompletionStage invoke(ScheduledExecution execution) throws Excepti @Override public CompletionStage executeJob() { - try { - return delegate.invoke(execution); - } catch (Exception e) { - return CompletableFuture.failedFuture(e); - } + return invokeDelegate(execution); } @Override diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipConcurrentExecutionInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipConcurrentExecutionInvoker.java index fc4a21906dbfb..6171099279cd5 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipConcurrentExecutionInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipConcurrentExecutionInvoker.java @@ -34,7 +34,7 @@ public SkipConcurrentExecutionInvoker(ScheduledInvoker delegate, Event invoke(ScheduledExecution execution) throws Exception { if (running.compareAndSet(false, true)) { - return delegate.invoke(execution).whenComplete((r, t) -> running.set(false)); + return invokeDelegate(execution).whenComplete((r, t) -> running.set(false)); } LOG.debugf("Skipped scheduled invoker execution: %s", delegate.getClass().getName()); SkippedExecution payload = new SkippedExecution(execution, diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipPredicateInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipPredicateInvoker.java index 7daf2c80234fc..90f7c7f9f2834 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipPredicateInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SkipPredicateInvoker.java @@ -40,7 +40,7 @@ public CompletionStage invoke(ScheduledExecution execution) throws Excepti event.fireAsync(payload); return CompletableFuture.completedStage(null); } else { - return delegate.invoke(execution); + return invokeDelegate(execution); } } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/StatusEmitterInvoker.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/StatusEmitterInvoker.java index 2a06be45171b5..20797e83405ee 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/StatusEmitterInvoker.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/StatusEmitterInvoker.java @@ -32,7 +32,7 @@ public StatusEmitterInvoker(ScheduledInvoker delegate, Event invoke(ScheduledExecution execution) throws Exception { - return delegate.invoke(execution).whenComplete((v, t) -> { + return invokeDelegate(execution).whenComplete((v, t) -> { if (t != null) { LOG.errorf(t, "Error occurred while executing task for trigger %s", execution.getTrigger()); Events.fire(failedEvent, new FailedExecution(execution, t)); diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java index 27290798d8c4a..5c00b7181d4fc 100644 --- a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/ConcurrentExecutionSkipTest.java @@ -2,6 +2,7 @@ import static io.quarkus.scheduler.Scheduled.ConcurrentExecution.SKIP; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import java.util.concurrent.CountDownLatch; @@ -13,6 +14,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import io.quarkus.scheduler.FailedExecution; import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.SkippedExecution; import io.quarkus.scheduler.SuccessfulExecution; @@ -39,6 +41,10 @@ public void testExecution() { } else { fail("Jobs were not executed in 10 seconds!"); } + + assertTrue(Jobs.FAILED_LATCH.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.FAILURE_COUNTER.get() > 0); + } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); @@ -50,8 +56,11 @@ static class Jobs { static final CountDownLatch BLOCKING_LATCH = new CountDownLatch(1); static final AtomicInteger COUNTER = new AtomicInteger(0); + static final AtomicInteger FAILING_COUNTER = new AtomicInteger(0); static final AtomicInteger SUCCESS_COUNTER = new AtomicInteger(0); + static final AtomicInteger FAILURE_COUNTER = new AtomicInteger(0); static final CountDownLatch SKIPPED_LATCH = new CountDownLatch(1); + static final CountDownLatch FAILED_LATCH = new CountDownLatch(1); @Scheduled(every = "1s", concurrentExecution = SKIP) void nonconcurrent() throws InterruptedException { @@ -61,6 +70,14 @@ void nonconcurrent() throws InterruptedException { } } + @Scheduled(every = "1s", concurrentExecution = SKIP) + void failing() { + if (FAILING_COUNTER.incrementAndGet() > 2) { + FAILED_LATCH.countDown(); + } + throw new IllegalStateException(); + } + void onSkip(@Observes SkippedExecution event) { SKIPPED_LATCH.countDown(); } @@ -68,5 +85,9 @@ void onSkip(@Observes SkippedExecution event) { void onSuccess(@Observes SuccessfulExecution event) { SUCCESS_COUNTER.incrementAndGet(); } + + void onFailure(@Observes FailedExecution event) { + FAILURE_COUNTER.incrementAndGet(); + } } }