From d4bd15a14e1f8ae901ab6e356cbebf168fccac8f Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Wed, 24 Jul 2024 17:26:22 +0200 Subject: [PATCH] Scheduler: run multiple scheduler implementations - related to #41954 --- .../main/asciidoc/scheduler-reference.adoc | 26 ++- .../quartz/deployment/QuartzProcessor.java | 10 +- .../composite/CompositeJobDefinitionTest.java | 108 ++++++++++ .../CompositeSchedulerNotUsedTest.java | 36 ++++ .../composite/CompositeSchedulerTest.java | 99 +++++++++ .../quartz/runtime/QuartzSchedulerImpl.java | 15 +- .../java/io/quarkus/scheduler/Scheduled.java | 46 +++++ .../java/io/quarkus/scheduler/Scheduler.java | 21 ++ .../common/runtime/AbstractJobDefinition.java | 34 ++-- .../common/runtime/SchedulerContext.java | 8 + .../common/runtime/SyntheticScheduled.java | 12 +- .../runtime/util/SchedulerUtilsTest.java | 6 + .../runtime/util/SyntheticScheduledTest.java | 3 +- .../DiscoveredImplementationsBuildItem.java | 63 ++++++ .../SchedulerImplementationBuildItem.java | 52 +++++ .../deployment/SchedulerProcessor.java | 90 +++++++- ...edulerImplementationNotDiscoveredTest.java | 36 ++++ .../scheduler/runtime/CompositeScheduler.java | 192 ++++++++++++++++++ .../scheduler/runtime/Constituent.java | 26 +++ .../scheduler/runtime/SchedulerConfig.java | 12 ++ .../scheduler/runtime/SchedulerRecorder.java | 28 ++- .../scheduler/runtime/SimpleScheduler.java | 15 +- 22 files changed, 904 insertions(+), 34 deletions(-) create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeJobDefinitionTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerNotUsedTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerTest.java create mode 100644 extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/DiscoveredImplementationsBuildItem.java create mode 100644 extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerImplementationBuildItem.java create mode 100644 extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/composite/SchedulerImplementationNotDiscoveredTest.java create mode 100644 extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java create mode 100644 extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/Constituent.java diff --git a/docs/src/main/asciidoc/scheduler-reference.adoc b/docs/src/main/asciidoc/scheduler-reference.adoc index ebfa3a4f34540..943ff8319b1fe 100644 --- a/docs/src/main/asciidoc/scheduler-reference.adoc +++ b/docs/src/main/asciidoc/scheduler-reference.adoc @@ -100,7 +100,7 @@ For example `"0 15 10 * * ?"` fires at 10:15am every day. [source,java] ---- @Scheduled(cron = "0 15 10 * * ?") -void fireAt10AmEveryDay() { } +void fireAt1015AmEveryDay() { } ---- The syntax used in CRON expressions is controlled by the `quarkus.scheduler.cron-type` property. @@ -359,6 +359,30 @@ class Jobs { ---- <1> The return type `Uni` instructs the scheduler to execute the method on the Vert.x event loop. +=== How to use multiple scheduler implementations + +In some cases, it might be useful to choose a scheduler implementation used to execute a scheduled method. +However, only one `Scheduler` implementation is used for all scheduled methods by default. +For example, the `quarkus-quartz` extension provides an implementation that supports clustering but it also removes the simple in-memory implementation from the game. +Now, if clustering is enabled then it's not possible to define a scheduled method that would be executed locally on a single node. +Nevertheless, if you set the `quarkus.scheduler.use-composite-scheduler` config property to `true` then a composite `Scheduler` is used instead. +This means that multiple scheduler implementations are kept running side by side. +Furthermore, it's possible to chose a specific implementation used to execute a scheduled method using `@Scheduled#executeWith()`. + +[source,java] +---- +class Jobs { + + @Scheduled(cron = "0 15 10 * * ?") <1> + void fireAt10AmEveryDay() { } + + @Scheduled(every = "1s", executeWith = Scheduled.SIMPLE) <2> + void everySecond() { } +} +---- +<1> If the `quarkus-quartz` extension is present then this method will be executed with the Quartz-specific scheduler. +<2> If `quarkus.scheduler.use-composite-scheduler=true` is set then this method will be executed with the simple in-memory implementation provided by the `quarkus-scheduler` extension. + == Scheduler Quarkus provides a built-in bean of type `io.quarkus.scheduler.Scheduler` that can be injected and used to pause/resume the scheduler and individual scheduled methods identified by a specific `Scheduled#identity()`. diff --git a/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java b/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java index 30f0727e9c68b..c74d35b710596 100644 --- a/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java +++ b/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java @@ -64,10 +64,9 @@ import io.quarkus.quartz.runtime.jdbc.QuarkusPostgreSQLDelegate; import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate; import io.quarkus.runtime.configuration.ConfigurationException; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem; -/** - * - */ public class QuartzProcessor { private static final DotName JOB = DotName.createSimple(Job.class.getName()); @@ -77,6 +76,11 @@ FeatureBuildItem feature() { return new FeatureBuildItem(Feature.QUARTZ); } + @BuildStep + SchedulerImplementationBuildItem implementation() { + return new SchedulerImplementationBuildItem(Scheduled.QUARTZ, DotName.createSimple(QuartzSchedulerImpl.class), 1); + } + @BuildStep AdditionalBeanBuildItem beans() { return new AdditionalBeanBuildItem(QuartzSchedulerImpl.class); diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeJobDefinitionTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeJobDefinitionTest.java new file mode 100644 index 0000000000000..3f0a82a31277f --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeJobDefinitionTest.java @@ -0,0 +1,108 @@ +package io.quarkus.quartz.test.composite; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.quarkus.scheduler.runtime.Constituent; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.annotation.Identifier; + +public class CompositeJobDefinitionTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + }) + .overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true") + .overrideConfigKey("quarkus.scheduler.start-mode", "forced"); + + @Constituent + QuartzScheduler quartz; + + @Constituent + @Identifier("SIMPLE") + Scheduler simple; + + @Inject + Scheduler composite; + + static CountDownLatch simpleLatch = new CountDownLatch(1); + static CountDownLatch quartzLatch = new CountDownLatch(1); + static CountDownLatch autoLatch = new CountDownLatch(1); + + static void reset() { + simpleLatch = new CountDownLatch(1); + quartzLatch = new CountDownLatch(1); + autoLatch = new CountDownLatch(1); + } + + @Test + public void testExecution() throws InterruptedException { + + assertEquals("Scheduler implementation not available: bar", + assertThrows(IllegalArgumentException.class, () -> composite.newJob("foo").setExecuteWith("bar")).getMessage()); + + composite.newJob("simple") + .setInterval("1s") + .setExecuteWith(Scheduled.SIMPLE) + .setTask(se -> { + simpleLatch.countDown(); + }).schedule(); + + composite.newJob("quartz") + .setInterval("1s") + .setExecuteWith(Scheduled.QUARTZ) + .setTask(se -> { + quartzLatch.countDown(); + }).schedule(); + + composite.newJob("auto") + .setInterval("1s") + .setTask(se -> { + autoLatch.countDown(); + }).schedule(); + + assertTrue(simpleLatch.await(5, TimeUnit.SECONDS)); + assertTrue(quartzLatch.await(5, TimeUnit.SECONDS)); + assertTrue(autoLatch.await(5, TimeUnit.SECONDS)); + + assertNull(quartz.getScheduledJob("simple")); + assertNotNull(quartz.getScheduledJob("quartz")); + assertNotNull(quartz.getScheduledJob("auto")); + + assertNotNull(simple.getScheduledJob("simple")); + assertNull(simple.getScheduledJob("quartz")); + assertNull(simple.getScheduledJob("auto")); + + assertNotNull(composite.getScheduledJob("quartz")); + assertNotNull(composite.getScheduledJob("auto")); + assertNotNull(composite.getScheduledJob("simple")); + + composite.pause(); + reset(); + assertFalse(composite.isRunning()); + assertFalse(simpleLatch.await(2, TimeUnit.SECONDS)); + + composite.resume(); + assertTrue(composite.isRunning()); + assertTrue(simpleLatch.await(5, TimeUnit.SECONDS)); + assertTrue(quartzLatch.await(5, TimeUnit.SECONDS)); + assertTrue(autoLatch.await(5, TimeUnit.SECONDS)); + } + +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerNotUsedTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerNotUsedTest.java new file mode 100644 index 0000000000000..08a8dd58a85b9 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerNotUsedTest.java @@ -0,0 +1,36 @@ +package io.quarkus.quartz.test.composite; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class CompositeSchedulerNotUsedTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .assertException(t -> { + assertThat(t).cause().isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The required scheduler implementation is not available because the composite scheduler is not used: SIMPLE"); + }); + + @Test + public void test() { + fail(); + } + + static class Jobs { + + @Scheduled(every = "1s", executeWith = Scheduled.SIMPLE) + void quartz() { + } + + } +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerTest.java new file mode 100644 index 0000000000000..d08607b05e317 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/composite/CompositeSchedulerTest.java @@ -0,0 +1,99 @@ +package io.quarkus.quartz.test.composite; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.quarkus.scheduler.runtime.Constituent; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.annotation.Identifier; + +public class CompositeSchedulerTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true"); + + @Constituent + QuartzScheduler quartz; + + @Constituent + @Identifier("SIMPLE") + Scheduler simple; + + @Inject + Scheduler composite; + + @Test + public void testExecution() throws InterruptedException { + assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS)); + + assertNull(quartz.getScheduledJob("simple")); + assertNotNull(quartz.getScheduledJob("quartz")); + assertNotNull(quartz.getScheduledJob("auto")); + + assertNotNull(simple.getScheduledJob("simple")); + assertNull(simple.getScheduledJob("quartz")); + assertNull(simple.getScheduledJob("auto")); + + assertNotNull(composite.getScheduledJob("quartz")); + assertNotNull(composite.getScheduledJob("auto")); + assertNotNull(composite.getScheduledJob("simple")); + + composite.pause(); + Jobs.reset(); + assertFalse(composite.isRunning()); + assertFalse(Jobs.simpleLatch.await(2, TimeUnit.SECONDS)); + + composite.resume(); + assertTrue(composite.isRunning()); + assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS)); + assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS)); + } + + static class Jobs { + + static CountDownLatch simpleLatch = new CountDownLatch(1); + static CountDownLatch quartzLatch = new CountDownLatch(1); + static CountDownLatch autoLatch = new CountDownLatch(1); + + static void reset() { + simpleLatch = new CountDownLatch(1); + quartzLatch = new CountDownLatch(1); + autoLatch = new CountDownLatch(1); + } + + @Scheduled(identity = "simple", every = "1s", executeWith = Scheduled.SIMPLE) + void simple() { + simpleLatch.countDown(); + } + + @Scheduled(identity = "quartz", every = "1s", executeWith = Scheduled.QUARTZ) + void quartz() { + quartzLatch.countDown(); + } + + @Scheduled(identity = "auto", every = "1s") + void auto() { + autoLatch.countDown(); + } + + } +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index 7e08b2a596de5..78c859eefe8ff 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -198,7 +198,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport if (!enabled) { LOGGER.info("Quartz scheduler is disabled by config property and will not be started"); this.scheduler = null; - } else if (!forceStart && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) { + } else if (!forceStart && context.getScheduledMethods(Scheduled.QUARTZ).isEmpty() + && !context.forceSchedulerStart()) { LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started"); this.scheduler = null; } else { @@ -232,10 +233,13 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) { } }; - for (ScheduledMethod method : context.getScheduledMethods()) { + for (ScheduledMethod method : context.getScheduledMethods(Scheduled.QUARTZ)) { int nameSequence = 0; for (Scheduled scheduled : method.getSchedules()) { + if (!context.matchesImplementation(scheduled, Scheduled.QUARTZ)) { + continue; + } String identity = SchedulerUtils.lookUpPropertyValue(scheduled.identity()); if (identity.isEmpty()) { identity = ++nameSequence + "_" + method.getInvokerClassName(); @@ -345,6 +349,11 @@ public org.quartz.Scheduler getScheduler() { return scheduler; } + @Override + public String implementation() { + return Scheduled.QUARTZ; + } + @Override public void pause() { if (!enabled) { @@ -893,7 +902,7 @@ public Trigger schedule() { } scheduled = true; SyntheticScheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed, - overdueGracePeriod, concurrentExecution, skipPredicate, timeZone); + overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation); return createJobDefinitionQuartzTrigger(this, scheduled, null); } diff --git a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduled.java b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduled.java index e13e64c127ae2..2ab9e35c74344 100644 --- a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduled.java +++ b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduled.java @@ -62,6 +62,28 @@ */ String DEFAULT_TIMEZONE = "<>"; + /** + * Constant value for {@link #executeWith()} indicating that the implementation should be selected automatically, i.e. the + * implementation with highest priority is used. + */ + String AUTO = "<>"; + + /** + * Constant value for {@link #executeWith()} indicating that the simple in-memory implementation provided by the + * {@code quarkus-scheduler} extension should be used. + *

+ * This implementation has priority {@code 0}. + */ + String SIMPLE = "SIMPLE"; + + /** + * Constant value for {@link #executeWith()} indicating that the Quartz implementation provided by the + * {@code quarkus-quartz} extension should be used. + *

+ * This implementation has priority {@code 1}. + */ + String QUARTZ = "QUARTZ"; + /** * Optionally defines a unique identifier for this job. *

@@ -205,6 +227,30 @@ */ String timeZone() default DEFAULT_TIMEZONE; + /** + * Choose a scheduler implementation used to execute a scheduled method. + *

+ * Only one scheduler implementation is used for all scheduled methods by default. For example, the {@code quarkus-quartz} + * extension provides an implementation that supports clustering but it also removes the simple in-memory implementation + * from the game. + *

+ * If the {@code quarkus.scheduler.use-composite-scheduler} config property is set to {@code true} then a composite + * scheduler is used instead. This means that multiple scheduler implementations are kept running side by side. + * In this case, it's possible to choose a specific implementation used to execute a scheduled method. By default, the + * implementation with highest priority is selected automatically. + *

+ * If the {@code quarkus.scheduler.use-composite-scheduler} config property is set to {@code false} (default) and the + * required implementation is not the implementation with highest priority, then the build fails. + *

+ * In any case, if the required implementation is not available, then the build fails. + * + * @return the implementation to execute this scheduled method + * @see #AUTO + * @see #SIMPLE + * @see #QUARTZ + */ + String executeWith() default AUTO; + @Retention(RUNTIME) @Target(METHOD) @interface Schedules { diff --git a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java index f5dd65e7440c6..2d10922bf9ab3 100644 --- a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java +++ b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java @@ -44,10 +44,14 @@ public interface Scheduler { /** * Identity must not be null and {@code false} is returned for non-existent identity. + *

+ * Note that this method only returns {@code true} if the job was explicitly paused. I.e. it does not reflect a paused + * scheduler. * * @param identity * @return {@code true} if the job with the given identity is paused, {@code false} otherwise * @see Scheduled#identity() + * @see #pause(String) */ boolean isPaused(String identity); @@ -88,6 +92,13 @@ public interface Scheduler { */ Trigger unscheduleJob(String identity); + /** + * + * @return the implementation + * @see Scheduled#executeWith() + */ + String implementation(); + /** * The job definition is a builder-like API that can be used to define a job programmatically. *

@@ -177,6 +188,16 @@ interface JobDefinition { */ JobDefinition setTimeZone(String timeZone); + /** + * {@link Scheduled#executeWith()} + * + * @param implementation + * @return self + * @throws IllegalArgumentException If the composite scheduler is used and the selected implementation is not available + * @see Scheduled#executeWith() + */ + JobDefinition setExecuteWith(String implementation); + /** * * @param task diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java index eef7fe84d7c27..3feb514cd413e 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java @@ -1,5 +1,6 @@ package io.quarkus.scheduler.common.runtime; +import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; @@ -28,6 +29,7 @@ public abstract class AbstractJobDefinition implements JobDefinition { protected boolean scheduled = false; protected String timeZone = Scheduled.DEFAULT_TIMEZONE; protected boolean runOnVirtualThread; + protected String implementation = Scheduled.AUTO; public AbstractJobDefinition(String identity) { this.identity = identity; @@ -36,55 +38,63 @@ public AbstractJobDefinition(String identity) { @Override public JobDefinition setCron(String cron) { checkScheduled(); - this.cron = cron; + this.cron = Objects.requireNonNull(cron); return this; } @Override public JobDefinition setInterval(String every) { checkScheduled(); - this.every = every; + this.every = Objects.requireNonNull(every); return this; } @Override public JobDefinition setDelayed(String period) { checkScheduled(); - this.delayed = period; + this.delayed = Objects.requireNonNull(period); return this; } @Override public JobDefinition setConcurrentExecution(ConcurrentExecution concurrentExecution) { checkScheduled(); - this.concurrentExecution = concurrentExecution; + this.concurrentExecution = Objects.requireNonNull(concurrentExecution); return this; } @Override public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { checkScheduled(); - this.skipPredicate = skipPredicate; + this.skipPredicate = Objects.requireNonNull(skipPredicate); return this; } @Override public JobDefinition setSkipPredicate(Class skipPredicateClass) { - this.skipPredicateClass = skipPredicateClass; + checkScheduled(); + this.skipPredicateClass = Objects.requireNonNull(skipPredicateClass); return setSkipPredicate(SchedulerUtils.instantiateBeanOrClass(skipPredicateClass)); } @Override public JobDefinition setOverdueGracePeriod(String period) { checkScheduled(); - this.overdueGracePeriod = period; + this.overdueGracePeriod = Objects.requireNonNull(period); return this; } @Override public JobDefinition setTimeZone(String timeZone) { checkScheduled(); - this.timeZone = timeZone; + this.timeZone = Objects.requireNonNull(timeZone); + return this; + } + + @Override + public JobDefinition setExecuteWith(String implementation) { + checkScheduled(); + this.implementation = Objects.requireNonNull(implementation); return this; } @@ -94,14 +104,14 @@ public JobDefinition setTask(Consumer task, boolean runOnVir if (asyncTask != null) { throw new IllegalStateException("Async task was already set"); } - this.task = task; + this.task = Objects.requireNonNull(task); this.runOnVirtualThread = runOnVirtualThread; return this; } @Override public JobDefinition setTask(Class> taskClass, boolean runOnVirtualThread) { - this.taskClass = taskClass; + this.taskClass = Objects.requireNonNull(taskClass); return setTask(SchedulerUtils.instantiateBeanOrClass(taskClass), runOnVirtualThread); } @@ -111,13 +121,13 @@ public JobDefinition setAsyncTask(Function> asyncT if (task != null) { throw new IllegalStateException("Sync task was already set"); } - this.asyncTask = asyncTask; + this.asyncTask = Objects.requireNonNull(asyncTask); return this; } @Override public JobDefinition setAsyncTask(Class>> asyncTaskClass) { - this.asyncTaskClass = asyncTaskClass; + this.asyncTaskClass = Objects.requireNonNull(asyncTaskClass); return setAsyncTask(SchedulerUtils.instantiateBeanOrClass(asyncTaskClass)); } diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java index 5b2a406467a26..50cbe5387724f 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SchedulerContext.java @@ -5,6 +5,8 @@ import com.cronutils.model.CronType; +import io.quarkus.scheduler.Scheduled; + public interface SchedulerContext { CronType getCronType(); @@ -13,6 +15,12 @@ public interface SchedulerContext { boolean forceSchedulerStart(); + List getScheduledMethods(String implementation); + + boolean matchesImplementation(Scheduled scheduled, String implementation); + + String autoImplementation(); + @SuppressWarnings("unchecked") default ScheduledInvoker createInvoker(String invokerClassName) { try { diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SyntheticScheduled.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SyntheticScheduled.java index a598ef744ad6f..8345b061c925c 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SyntheticScheduled.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/SyntheticScheduled.java @@ -22,10 +22,11 @@ public final class SyntheticScheduled extends AnnotationLiteral imple private final ConcurrentExecution concurrentExecution; private final SkipPredicate skipPredicate; private final String timeZone; + private final String implementation; public SyntheticScheduled(String identity, String cron, String every, long delay, TimeUnit delayUnit, String delayed, String overdueGracePeriod, ConcurrentExecution concurrentExecution, - SkipPredicate skipPredicate, String timeZone) { + SkipPredicate skipPredicate, String timeZone, String implementation) { this.identity = Objects.requireNonNull(identity); this.cron = Objects.requireNonNull(cron); this.every = Objects.requireNonNull(every); @@ -36,6 +37,7 @@ public SyntheticScheduled(String identity, String cron, String every, long delay this.concurrentExecution = Objects.requireNonNull(concurrentExecution); this.skipPredicate = skipPredicate; this.timeZone = timeZone; + this.implementation = implementation; } @Override @@ -88,6 +90,11 @@ public String timeZone() { return timeZone; } + @Override + public String executeWith() { + return implementation; + } + public String toJson() { if (skipPredicate != null) { throw new IllegalStateException("A skipPredicate instance may not be serialized"); @@ -102,6 +109,7 @@ public String toJson() { json.put("overdueGracePeriod", overdueGracePeriod); json.put("concurrentExecution", concurrentExecution.toString()); json.put("timeZone", timeZone); + json.put("executeWith", implementation); return json.encode(); } @@ -110,7 +118,7 @@ public static SyntheticScheduled fromJson(String json) { return new SyntheticScheduled(jsonObj.getString("identity"), jsonObj.getString("cron"), jsonObj.getString("every"), jsonObj.getLong("delay"), TimeUnit.valueOf(jsonObj.getString("delayUnit")), jsonObj.getString("delayed"), jsonObj.getString("overdueGracePeriod"), ConcurrentExecution.valueOf(jsonObj.getString("concurrentExecution")), - null, jsonObj.getString("timeZone")); + null, jsonObj.getString("timeZone"), jsonObj.getString("executeWith")); } @Override diff --git a/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SchedulerUtilsTest.java b/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SchedulerUtilsTest.java index 09083fce197d2..493d4c10ea602 100644 --- a/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SchedulerUtilsTest.java +++ b/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SchedulerUtilsTest.java @@ -134,6 +134,12 @@ public Class skipExecutionIf() { public String delayed() { return delayed; } + + @Override + public String executeWith() { + return AUTO; + } + }; } } diff --git a/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SyntheticScheduledTest.java b/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SyntheticScheduledTest.java index b80460db3b380..a8119398ff8b3 100644 --- a/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SyntheticScheduledTest.java +++ b/extensions/scheduler/common/src/test/java/io/quarkus/scheduler/common/runtime/util/SyntheticScheduledTest.java @@ -15,7 +15,7 @@ public class SyntheticScheduledTest { @Test public void testJson() { SyntheticScheduled s1 = new SyntheticScheduled("foo", "", "2s", 0, TimeUnit.SECONDS, "1s", "15m", - ConcurrentExecution.PROCEED, null, Scheduled.DEFAULT_TIMEZONE); + ConcurrentExecution.PROCEED, null, Scheduled.DEFAULT_TIMEZONE, Scheduled.AUTO); SyntheticScheduled s2 = SyntheticScheduled.fromJson(s1.toJson()); assertEquals(s1.identity(), s2.identity()); assertEquals(s1.concurrentExecution(), s2.concurrentExecution()); @@ -26,6 +26,7 @@ public void testJson() { assertEquals(s1.delayed(), s2.delayed()); assertEquals(s1.overdueGracePeriod(), s2.overdueGracePeriod()); assertEquals(s1.timeZone(), s2.timeZone()); + assertEquals(s1.executeWith(), s2.executeWith()); } } diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/DiscoveredImplementationsBuildItem.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/DiscoveredImplementationsBuildItem.java new file mode 100644 index 0000000000000..a9b6eda0f3f43 --- /dev/null +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/DiscoveredImplementationsBuildItem.java @@ -0,0 +1,63 @@ +package io.quarkus.scheduler.deployment; + +import java.util.Objects; +import java.util.Set; + +import io.quarkus.builder.item.SimpleBuildItem; +import io.quarkus.scheduler.runtime.CompositeScheduler; +import io.quarkus.scheduler.runtime.Constituent; +import io.quarkus.scheduler.runtime.SchedulerConfig; +import io.smallrye.common.annotation.Identifier; + +/** + * This build item holds all discovered {@link io.quarkus.scheduler.Scheduler} implementations sorted by priority. Higher + * priority goes first. + */ +public final class DiscoveredImplementationsBuildItem extends SimpleBuildItem { + + private final String autoImplementation; + + private final Set implementations; + + private final boolean useCompositeScheduler; + + DiscoveredImplementationsBuildItem(String autoImplementation, Set implementations, boolean useCompositeScheduler) { + this.autoImplementation = Objects.requireNonNull(autoImplementation); + this.implementations = Objects.requireNonNull(implementations); + this.useCompositeScheduler = useCompositeScheduler; + } + + /** + * + * @return the implementation with highest priority + */ + public String getAutoImplementation() { + return autoImplementation; + } + + public Set getImplementations() { + return implementations; + } + + /** + * A composite scheduler is used if multiple scheduler implementations are found and + * {@link SchedulerConfig#useCompositeScheduler} is set to {@code true}. + *

+ * The extension will add: + *

    + *
  • the {@link Constituent} marker qualifier,
  • + *
  • the {@link Identifier} qualifier with the corresponding implementation value.
  • + *
+ * + * @return {@code true} if a composite scheduler is used + * @see CompositeScheduler + */ + public boolean isCompositeSchedulerUsed() { + return useCompositeScheduler && implementations.size() > 1; + } + + public boolean isAutoImplementation(String implementation) { + return autoImplementation.equals(implementation); + } + +} diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerImplementationBuildItem.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerImplementationBuildItem.java new file mode 100644 index 0000000000000..d135dd9eefa1e --- /dev/null +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerImplementationBuildItem.java @@ -0,0 +1,52 @@ +package io.quarkus.scheduler.deployment; + +import org.jboss.jandex.DotName; + +import io.quarkus.builder.item.MultiBuildItem; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; + +/** + * An extension that provides an implementation of {@link Scheduler} must produce this build item. + *

+ * If multiple extensions produce this build item with the same {@link #implementation} value then the build fails. + */ +public final class SchedulerImplementationBuildItem extends MultiBuildItem { + + private final String implementation; + + private final DotName schedulerBeanClass; + + private final int priority; + + public SchedulerImplementationBuildItem(String implementation, DotName schedulerBeanClass, int priority) { + this.implementation = implementation; + this.schedulerBeanClass = schedulerBeanClass; + this.priority = priority; + } + + public String getImplementation() { + return implementation; + } + + public DotName getSchedulerBeanClass() { + return schedulerBeanClass; + } + + /** + * The implementation with highest priority is selected if {@link Scheduled#AUTO} is used. + * + * @return the priority + * @see Scheduled#AUTO + */ + public int getPriority() { + return priority; + } + + @Override + public String toString() { + return "SchedulerImplementationBuildItem [" + (implementation != null ? "implementation=" + implementation + ", " : "") + + "priority=" + priority + "]"; + } + +} diff --git a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java index 1188ec6200408..2db5a37c62741 100644 --- a/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java +++ b/extensions/scheduler/deployment/src/main/java/io/quarkus/scheduler/deployment/SchedulerProcessor.java @@ -10,6 +10,7 @@ import java.time.Duration; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -19,8 +20,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Function; +import java.util.stream.Collectors; import org.jboss.jandex.AnnotationInstance; +import org.jboss.jandex.AnnotationTransformation; import org.jboss.jandex.AnnotationValue; import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; @@ -82,9 +85,12 @@ import io.quarkus.scheduler.common.runtime.MutableScheduledMethod; import io.quarkus.scheduler.common.runtime.SchedulerContext; import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; +import io.quarkus.scheduler.runtime.CompositeScheduler; +import io.quarkus.scheduler.runtime.Constituent; import io.quarkus.scheduler.runtime.SchedulerConfig; import io.quarkus.scheduler.runtime.SchedulerRecorder; import io.quarkus.scheduler.runtime.SimpleScheduler; +import io.smallrye.common.annotation.Identifier; public class SchedulerProcessor { @@ -97,9 +103,58 @@ public class SchedulerProcessor { static final String NESTED_SEPARATOR = "$_"; @BuildStep - void beans(Capabilities capabilities, BuildProducer additionalBeans) { + SchedulerImplementationBuildItem implementation() { + return new SchedulerImplementationBuildItem(Scheduled.SIMPLE, DotName.createSimple(SimpleScheduler.class), 0); + } + + @BuildStep + void compositeScheduler(SchedulerConfig config, List implementations, + BuildProducer additionalBeans, + BuildProducer discoveredImplementations) { + List sorted = implementations.stream() + .sorted(Comparator.comparingInt(SchedulerImplementationBuildItem::getPriority).reversed()).toList(); + Set found = sorted.stream().map(SchedulerImplementationBuildItem::getImplementation) + .collect(Collectors.toUnmodifiableSet()); + if (found.size() != implementations.size()) { + throw new IllegalStateException("Invalid scheduler implementations detected: " + implementations); + } + DiscoveredImplementationsBuildItem discovered = new DiscoveredImplementationsBuildItem( + sorted.get(0).getImplementation(), found, + config.useCompositeScheduler); + discoveredImplementations.produce(discovered); + if (implementations.size() > 1 && config.useCompositeScheduler) { + // If multiple implementations are needed we have to register the CompositeScheduler, and + // instruct the extensions that provide an implementation to modify the bean metadata, i.e. add the marker qualifier + additionalBeans.produce(AdditionalBeanBuildItem.builder() + .addBeanClasses(Constituent.class, CompositeScheduler.class).setUnremovable().build()); + } + } + + @BuildStep + void transformSchedulerBeans(DiscoveredImplementationsBuildItem discoveredImplementations, + List implementations, + BuildProducer transformer) { + if (discoveredImplementations.isCompositeSchedulerUsed()) { + Map implsToBeanClass = implementations.stream() + .collect(Collectors.toMap(SchedulerImplementationBuildItem::getSchedulerBeanClass, + SchedulerImplementationBuildItem::getImplementation)); + transformer.produce(new AnnotationsTransformerBuildItem(AnnotationTransformation.forClasses() + .whenClass(c -> implsToBeanClass.containsKey(c.name())) + .transform(c -> { + c.add(AnnotationInstance.builder(Constituent.class).build()); + c.add(AnnotationInstance.builder(Identifier.class) + .add("value", implsToBeanClass.get(c.declaration().asClass().name())).build()); + }))); + } + } + + @BuildStep + void beans(DiscoveredImplementationsBuildItem discoveredImplementations, + BuildProducer additionalBeans) { additionalBeans.produce(new AdditionalBeanBuildItem(Scheduled.ApplicationNotRunning.class)); - if (capabilities.isMissing(Capability.QUARTZ)) { + if (discoveredImplementations.getImplementations().size() == 1 + || discoveredImplementations.isCompositeSchedulerUsed()) { + // Quartz extension is not present or composite scheduler is used additionalBeans.produce(new AdditionalBeanBuildItem(SimpleScheduler.class)); } } @@ -195,7 +250,8 @@ private void collectScheduledMethods(IndexView index, TransformedAnnotationsBuil @BuildStep void validateScheduledBusinessMethods(SchedulerConfig config, List scheduledMethods, ValidationPhaseBuildItem validationPhase, BuildProducer validationErrors, - Capabilities capabilities, BeanArchiveIndexBuildItem beanArchiveIndex) { + Capabilities capabilities, BeanArchiveIndexBuildItem beanArchiveIndex, + DiscoveredImplementationsBuildItem discoveredImplementations) { List errors = new ArrayList<>(); Map encounteredIdentities = new HashMap<>(); Set methodDescriptions = new HashSet<>(); @@ -252,7 +308,7 @@ void validateScheduledBusinessMethods(SchedulerConfig config, List unremovableBeans() { public FeatureBuildItem build(SchedulerConfig config, BuildProducer syntheticBeans, SchedulerRecorder recorder, List scheduledMethods, BuildProducer generatedClasses, BuildProducer reflectiveClass, - AnnotationProxyBuildItem annotationProxy, List schedulerForcedStartItems) { + AnnotationProxyBuildItem annotationProxy, List schedulerForcedStartItems, + DiscoveredImplementationsBuildItem discoveredImplementations) { List scheduledMetadata = new ArrayList<>(); ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClasses, new Function() { @@ -330,7 +387,8 @@ public String apply(String name) { } syntheticBeans.produce(SyntheticBeanBuildItem.configure(SchedulerContext.class).setRuntimeInit() - .supplier(recorder.createContext(config, scheduledMetadata, !schedulerForcedStartItems.isEmpty())) + .supplier(recorder.createContext(config, scheduledMetadata, !schedulerForcedStartItems.isEmpty(), + discoveredImplementations.getAutoImplementation())) .done()); return new FeatureBuildItem(Feature.SCHEDULER); @@ -530,8 +588,8 @@ private String generateInvoker(ScheduledBusinessMethodItem scheduledMethod, Clas } private Throwable validateScheduled(CronParser parser, AnnotationInstance schedule, - Map encounteredIdentities, - BeanDeploymentValidator.ValidationContext validationContext, long checkPeriod, IndexView index) { + Map encounteredIdentities, BeanDeploymentValidator.ValidationContext validationContext, + long checkPeriod, IndexView index, DiscoveredImplementationsBuildItem discoveredImplementations) { MethodInfo method = schedule.target().asMethod(); AnnotationValue cronValue = schedule.value("cron"); AnnotationValue everyValue = schedule.value("every"); @@ -645,6 +703,22 @@ private Throwable validateScheduled(CronParser parser, AnnotationInstance schedu } } } + + AnnotationValue executeWithValue = schedule.value("executeWith"); + if (executeWithValue != null) { + String implementation = executeWithValue.asString(); + if (!Scheduled.AUTO.equals(implementation)) { + if (!discoveredImplementations.getImplementations().contains(implementation)) { + return new IllegalStateException( + "The required scheduler implementation was not discovered in application: " + implementation); + } else if (!discoveredImplementations.isCompositeSchedulerUsed() + && !discoveredImplementations.isAutoImplementation(implementation)) { + return new IllegalStateException( + "The required scheduler implementation is not available because the composite scheduler is not used: " + + implementation); + } + } + } return null; } diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/composite/SchedulerImplementationNotDiscoveredTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/composite/SchedulerImplementationNotDiscoveredTest.java new file mode 100644 index 0000000000000..045bfc8553cb4 --- /dev/null +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/composite/SchedulerImplementationNotDiscoveredTest.java @@ -0,0 +1,36 @@ +package io.quarkus.scheduler.test.composite; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class SchedulerImplementationNotDiscoveredTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .assertException(t -> { + assertThat(t).cause().isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "The required scheduler implementation was not discovered in application: QUARTZ"); + }); + + @Test + public void test() { + fail(); + } + + static class Jobs { + + @Scheduled(every = "1s", executeWith = Scheduled.QUARTZ) + void quartz() { + } + + } +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java new file mode 100644 index 0000000000000..55d90854249bd --- /dev/null +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java @@ -0,0 +1,192 @@ +package io.quarkus.scheduler.runtime; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import jakarta.enterprise.inject.Typed; +import jakarta.inject.Singleton; + +import io.quarkus.arc.All; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.quarkus.scheduler.Trigger; +import io.quarkus.scheduler.common.runtime.AbstractJobDefinition; +import io.quarkus.scheduler.common.runtime.SchedulerContext; + +/** + * The composite scheduler is only used in case of multiple {@link Scheduler} implementations are required. + * + * @see Scheduled#executeWith() + */ +@Typed(Scheduler.class) +@Singleton +public class CompositeScheduler implements Scheduler { + + private final List schedulers; + + private final SchedulerContext schedulerContext; + + CompositeScheduler(@All @Constituent List schedulers, SchedulerContext schedulerContext) { + this.schedulers = schedulers; + this.schedulerContext = schedulerContext; + } + + @Override + public void pause() { + for (Scheduler scheduler : schedulers) { + scheduler.pause(); + } + } + + @Override + public void pause(String identity) { + for (Scheduler scheduler : schedulers) { + scheduler.pause(identity); + } + } + + @Override + public void resume() { + for (Scheduler scheduler : schedulers) { + scheduler.resume(); + } + } + + @Override + public void resume(String identity) { + for (Scheduler scheduler : schedulers) { + scheduler.resume(identity); + } + } + + @Override + public boolean isPaused(String identity) { + for (Scheduler scheduler : schedulers) { + if (scheduler.isPaused(identity)) { + return true; + } + } + return false; + } + + @Override + public boolean isRunning() { + // IMPL NOTE: we return true if at least one of the schedulers is running + for (Scheduler scheduler : schedulers) { + if (scheduler.isRunning()) { + return true; + } + } + return false; + } + + @Override + public List getScheduledJobs() { + List triggers = new ArrayList<>(); + for (Scheduler scheduler : schedulers) { + triggers.addAll(scheduler.getScheduledJobs()); + } + return triggers; + } + + @Override + public Trigger getScheduledJob(String identity) { + for (Scheduler scheduler : schedulers) { + Trigger trigger = scheduler.getScheduledJob(identity); + if (trigger != null) { + return trigger; + } + } + return null; + } + + @Override + public JobDefinition newJob(String identity) { + return new CompositeJobDefinition(identity); + } + + @Override + public Trigger unscheduleJob(String identity) { + for (Scheduler scheduler : schedulers) { + Trigger trigger = scheduler.unscheduleJob(identity); + if (trigger != null) { + return trigger; + } + } + return null; + } + + @Override + public String implementation() { + return Scheduled.AUTO; + } + + class CompositeJobDefinition extends AbstractJobDefinition { + + public CompositeJobDefinition(String identity) { + super(identity); + } + + @Override + public JobDefinition setExecuteWith(String implementation) { + Objects.requireNonNull(implementation); + if (!Scheduled.AUTO.equals(implementation)) { + if (schedulers.stream().map(Scheduler::implementation).noneMatch(implementation::equals)) { + throw new IllegalArgumentException("Scheduler implementation not available: " + implementation); + } + } + return super.setExecuteWith(implementation); + } + + @Override + public Trigger schedule() { + String impl = implementation; + if (Scheduled.AUTO.equals(impl)) { + impl = schedulerContext.autoImplementation(); + } + for (Scheduler scheduler : schedulers) { + if (scheduler.implementation().equals(impl)) { + return copy(scheduler.newJob(identity)).schedule(); + } + } + throw new IllegalStateException("Matching scheduler implementation not found: " + implementation); + } + + private JobDefinition copy(JobDefinition to) { + to.setCron(cron); + to.setInterval(every); + to.setDelayed(delayed); + to.setOverdueGracePeriod(overdueGracePeriod); + to.setConcurrentExecution(concurrentExecution); + to.setTimeZone(timeZone); + to.setExecuteWith(implementation); + if (skipPredicateClass != null) { + to.setSkipPredicate(skipPredicateClass); + } else if (skipPredicate != null) { + to.setSkipPredicate(skipPredicate); + } + if (taskClass != null) { + if (runOnVirtualThread) { + to.setTask(taskClass, runOnVirtualThread); + } else { + to.setTask(taskClass); + } + } else if (task != null) { + if (runOnVirtualThread) { + to.setTask(task, runOnVirtualThread); + } else { + to.setTask(task); + } + } + if (asyncTaskClass != null) { + to.setAsyncTask(asyncTaskClass); + } else if (asyncTask != null) { + to.setAsyncTask(asyncTask); + } + return to; + } + + } + +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/Constituent.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/Constituent.java new file mode 100644 index 0000000000000..4fbf7c8a8d5dd --- /dev/null +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/Constituent.java @@ -0,0 +1,26 @@ +package io.quarkus.scheduler.runtime; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.ElementType.TYPE; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import jakarta.inject.Qualifier; + +import io.quarkus.scheduler.Scheduler; + +/** + * This qualifier is used to mark a constituent of a composite {@link Scheduler}, i.e. to distinguish various scheduler + * implementations. + */ +@Qualifier +@Documented +@Retention(RUNTIME) +@Target({ TYPE, PARAMETER, FIELD }) +public @interface Constituent { + +} diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java index 23ce44a235145..970ae9923e34b 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerConfig.java @@ -6,6 +6,7 @@ import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; @ConfigRoot(phase = ConfigPhase.BUILD_AND_RUN_TIME_FIXED) public class SchedulerConfig { @@ -30,4 +31,15 @@ public class SchedulerConfig { */ @ConfigItem(name = "tracing.enabled") public boolean tracingEnabled; + + /** + * By default, only one {@link Scheduler} implementation is used. If set to {@code true} then a composite {@link Scheduler} + * that delegates to all running implementations is used. + *

+ * Scheduler implementations will be started depending on the value of {@code quarkus.scheduler.start-mode}, i.e. the + * scheduler is not started unless a relevant {@link io.quarkus.scheduler.Scheduled} business method is found. + */ + @ConfigItem(defaultValue = "false") + public boolean useCompositeScheduler; + } diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java index d0cc4037c9bc1..7198a7deb12b2 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SchedulerRecorder.java @@ -7,6 +7,7 @@ import com.cronutils.model.CronType; import io.quarkus.runtime.annotations.Recorder; +import io.quarkus.scheduler.Scheduled; import io.quarkus.scheduler.common.runtime.ImmutableScheduledMethod; import io.quarkus.scheduler.common.runtime.MutableScheduledMethod; import io.quarkus.scheduler.common.runtime.ScheduledMethod; @@ -16,7 +17,7 @@ public class SchedulerRecorder { public Supplier createContext(SchedulerConfig config, - List scheduledMethods, boolean forceSchedulerStart) { + List scheduledMethods, boolean forceSchedulerStart, String autoImplementation) { // Defensive design - make an immutable copy of the scheduled method metadata List metadata = immutableCopy(scheduledMethods); return new Supplier() { @@ -38,6 +39,31 @@ public List getScheduledMethods() { public boolean forceSchedulerStart() { return forceSchedulerStart; } + + @Override + public List getScheduledMethods(String implementation) { + List ret = new ArrayList<>(metadata.size()); + for (ScheduledMethod method : metadata) { + for (Scheduled scheduled : method.getSchedules()) { + if (matchesImplementation(scheduled, implementation)) { + ret.add(method); + } + } + } + return ret; + } + + @Override + public boolean matchesImplementation(Scheduled scheduled, String implementation) { + return scheduled.executeWith().equals(implementation) || ((autoImplementation.equals(implementation)) + && scheduled.executeWith().equals(Scheduled.AUTO)); + } + + @Override + public String autoImplementation() { + return autoImplementation; + } + }; } }; diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index 8a3d77797146e..0a176bcaa2610 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -131,7 +131,8 @@ public SimpleScheduler(SchedulerContext context, SchedulerRuntimeConfig schedule } StartMode startMode = schedulerRuntimeConfig.startMode.orElse(StartMode.NORMAL); - if (startMode == StartMode.NORMAL && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) { + if (startMode == StartMode.NORMAL && context.getScheduledMethods(Scheduled.SIMPLE).isEmpty() + && !context.forceSchedulerStart()) { this.scheduledExecutor = null; LOG.info("No scheduled business methods found - Simple scheduler will not be started"); return; @@ -168,9 +169,12 @@ public void run() { } // Create triggers and invokers for @Scheduled methods - for (ScheduledMethod method : context.getScheduledMethods()) { + for (ScheduledMethod method : context.getScheduledMethods(Scheduled.SIMPLE)) { int nameSequence = 0; for (Scheduled scheduled : method.getSchedules()) { + if (!context.matchesImplementation(scheduled, Scheduled.SIMPLE)) { + continue; + } nameSequence++; String id = SchedulerUtils.lookUpPropertyValue(scheduled.identity()); if (id.isEmpty()) { @@ -192,6 +196,11 @@ public void run() { } } + @Override + public String implementation() { + return Scheduled.SIMPLE; + } + @Override public JobDefinition newJob(String identity) { Objects.requireNonNull(identity); @@ -724,7 +733,7 @@ public boolean isBlocking() { }; } Scheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed, - overdueGracePeriod, concurrentExecution, skipPredicate, timeZone); + overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation); Optional trigger = createTrigger(identity, null, cronParser, scheduled, defaultOverdueGracePeriod); if (trigger.isPresent()) {