From 2ffe01295d7fea5814f03657a29edeca770599ca Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Thu, 31 Oct 2024 13:56:39 +0100 Subject: [PATCH] Quartz: introduce Nonconcurrent - the behavior is identical to a Job class annotated with DisallowConcurrentExecution - fixes #44048 --- .../quartz/deployment/QuartzProcessor.java | 21 +++- .../test/NonconcurrentJobDefinitionTest.java | 67 ++++++++++++ .../test/NonconcurrentOnQuartzThreadTest.java | 56 ++++++++++ .../test/NonconcurrentProgrammaticTest.java | 91 ++++++++++++++++ .../quartz/test/NonconcurrentTest.java | 54 ++++++++++ .../programmatic/ProgrammaticJobsTest.java | 6 +- .../java/io/quarkus/quartz/Nonconcurrent.java | 35 ++++++ .../io/quarkus/quartz/QuartzScheduler.java | 14 +++ .../quartz/runtime/QuartzRecorder.java | 5 +- .../quartz/runtime/QuartzSchedulerImpl.java | 101 ++++++++++++++---- .../quarkus/quartz/runtime/QuartzSupport.java | 19 +++- .../java/io/quarkus/scheduler/Scheduler.java | 36 +++---- .../common/runtime/AbstractJobDefinition.java | 57 +++++----- .../programmatic/ProgrammaticJobsTest.java | 6 +- .../scheduler/runtime/CompositeScheduler.java | 8 +- .../scheduler/runtime/SimpleScheduler.java | 4 +- 16 files changed, 499 insertions(+), 81 deletions(-) create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentOnQuartzThreadTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java create mode 100644 extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java create mode 100644 extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java diff --git a/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java b/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java index 81e2739284704..9c6dfe0f27814 100644 --- a/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java +++ b/extensions/quartz/deployment/src/main/java/io/quarkus/quartz/deployment/QuartzProcessor.java @@ -5,6 +5,7 @@ import java.sql.Connection; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -55,6 +56,7 @@ import io.quarkus.deployment.builditem.nativeimage.NativeImageProxyDefinitionBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.logging.LogCleanupFilterBuildItem; +import io.quarkus.quartz.Nonconcurrent; import io.quarkus.quartz.runtime.QuarkusQuartzConnectionPoolProvider; import io.quarkus.quartz.runtime.QuartzBuildTimeConfig; import io.quarkus.quartz.runtime.QuartzExtensionPointConfig; @@ -69,6 +71,7 @@ import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate; import io.quarkus.runtime.configuration.ConfigurationException; import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.deployment.ScheduledBusinessMethodItem; import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem; public class QuartzProcessor { @@ -79,6 +82,7 @@ public class QuartzProcessor { private static final DotName DELEGATE_HSQLDB = DotName.createSimple(QuarkusHSQLDBDelegate.class.getName()); private static final DotName DELEGATE_MSSQL = DotName.createSimple(QuarkusMSSQLDelegate.class.getName()); private static final DotName DELEGATE_STDJDBC = DotName.createSimple(QuarkusStdJDBCDelegate.class.getName()); + private static final DotName NONCONCURRENT = DotName.createSimple(Nonconcurrent.class); @BuildStep FeatureBuildItem feature() { @@ -313,12 +317,23 @@ public void start(BuildProducer serviceStart, @Record(RUNTIME_INIT) public void quartzSupportBean(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig, QuartzRecorder recorder, - BuildProducer syntheticBeanBuildItemBuildProducer, - QuartzJDBCDriverDialectBuildItem driverDialect) { + QuartzJDBCDriverDialectBuildItem driverDialect, + List scheduledMethods, + BuildProducer syntheticBeanBuildItemBuildProducer) { + + Set nonconcurrentMethods = new HashSet<>(); + for (ScheduledBusinessMethodItem m : scheduledMethods) { + if (m.getMethod().hasAnnotation(NONCONCURRENT)) { + nonconcurrentMethods.add(m.getMethod().declaringClass().name() + "#" + m.getMethod().name()); + } + } syntheticBeanBuildItemBuildProducer.produce(SyntheticBeanBuildItem.configure(QuartzSupport.class) .scope(Singleton.class) // this should be @ApplicationScoped but it fails for some reason .setRuntimeInit() - .supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver())).done()); + .supplier(recorder.quartzSupportSupplier(runtimeConfig, buildTimeConfig, driverDialect.getDriver(), + nonconcurrentMethods)) + .done()); } + } diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java new file mode 100644 index 0000000000000..6cb0446db3528 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentJobDefinitionTest.java @@ -0,0 +1,67 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentJobDefinitionTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root.addClasses(Jobs.class)) + .overrideConfigKey("quarkus.scheduler.start-mode", "forced"); + + @Inject + QuartzScheduler scheduler; + + @Test + public void testExecution() throws InterruptedException { + scheduler.newJob("foo") + .setTask(se -> { + Jobs.NONCONCURRENT_COUNTER.incrementAndGet(); + try { + if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (Jobs.NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + Jobs.NONCONCURRENT_LATCH.countDown(); + } + }) + .setInterval("1s") + .setNonconcurrent() + .schedule(); + + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + static class Jobs { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + } + +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentOnQuartzThreadTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentOnQuartzThreadTest.java new file mode 100644 index 0000000000000..dee4f50683f48 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentOnQuartzThreadTest.java @@ -0,0 +1,56 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.Nonconcurrent; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentOnQuartzThreadTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root.addClasses(Jobs.class)) + .overrideConfigKey("quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread", + "true"); + + @Test + public void testExecution() throws InterruptedException { + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + static class Jobs { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Nonconcurrent + @Scheduled(identity = "foo", every = "1s") + void nonconcurrent() throws InterruptedException { + NONCONCURRENT_COUNTER.incrementAndGet(); + if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + if (NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + NONCONCURRENT_LATCH.countDown(); + } + } + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + } +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java new file mode 100644 index 0000000000000..5ebeb934053c7 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentProgrammaticTest.java @@ -0,0 +1,91 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobBuilder; +import org.quartz.JobDetail; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.quartz.SchedulerException; +import org.quartz.SimpleScheduleBuilder; +import org.quartz.Trigger; +import org.quartz.TriggerBuilder; + +import io.quarkus.quartz.QuartzScheduler; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.Scheduler; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentProgrammaticTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root + .addClasses(Jobs.class)) + .overrideConfigKey("quarkus.scheduler.start-mode", "halted"); + + @Inject + QuartzScheduler scheduler; + + @Test + public void testExecution() throws SchedulerException, InterruptedException { + JobDetail job = JobBuilder.newJob(Jobs.class) + .withIdentity("foo", Scheduler.class.getName()) + .build(); + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity("foo", Scheduler.class.getName()) + .startNow() + .withSchedule(SimpleScheduleBuilder.simpleSchedule() + .withIntervalInSeconds(1) + .repeatForever()) + .build(); + scheduler.getScheduler().scheduleJob(job, trigger); + + scheduler.resume(); + + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + @DisallowConcurrentExecution + static class Jobs implements Job { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + Jobs.NONCONCURRENT_COUNTER.incrementAndGet(); + try { + if (!Jobs.CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + if (Jobs.NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + Jobs.NONCONCURRENT_LATCH.countDown(); + } + } + + } + +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java new file mode 100644 index 0000000000000..ec26ba9be0524 --- /dev/null +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/NonconcurrentTest.java @@ -0,0 +1,54 @@ +package io.quarkus.quartz.test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.quartz.Nonconcurrent; +import io.quarkus.scheduler.Scheduled; +import io.quarkus.test.QuarkusUnitTest; + +public class NonconcurrentTest { + + @RegisterExtension + static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> root.addClasses(Jobs.class)); + + @Test + public void testExecution() throws InterruptedException { + assertTrue(Jobs.NONCONCURRENT_LATCH.await(10, TimeUnit.SECONDS), + String.format("nonconcurrent() executed: %sx", Jobs.NONCONCURRENT_COUNTER.get())); + } + + static class Jobs { + + static final CountDownLatch NONCONCURRENT_LATCH = new CountDownLatch(1); + static final CountDownLatch CONCURRENT_LATCH = new CountDownLatch(5); + + static final AtomicInteger NONCONCURRENT_COUNTER = new AtomicInteger(0); + + @Nonconcurrent + @Scheduled(identity = "foo", every = "1s") + void nonconcurrent() throws InterruptedException { + NONCONCURRENT_COUNTER.incrementAndGet(); + if (!CONCURRENT_LATCH.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("nonconcurrent() execution blocked too long..."); + } + if (NONCONCURRENT_COUNTER.get() == 1) { + // concurrent() executed >= 5x and nonconcurrent() 1x + NONCONCURRENT_LATCH.countDown(); + } + } + + @Scheduled(identity = "bar", every = "1s") + void concurrent() throws InterruptedException { + CONCURRENT_LATCH.countDown(); + } + + } +} diff --git a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java index d2f5e62a5a55e..aa027694004b9 100644 --- a/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java +++ b/extensions/quartz/deployment/src/test/java/io/quarkus/quartz/test/programmatic/ProgrammaticJobsTest.java @@ -69,7 +69,7 @@ public void testJobs() throws InterruptedException { .setSkipPredicate(AlwaysSkipPredicate.class) .schedule(); - Scheduler.JobDefinition job1 = scheduler.newJob("foo") + Scheduler.JobDefinition job1 = scheduler.newJob("foo") .setInterval("1s") .setTask(ec -> { assertTrue(Arc.container().requestContext().isActive()); @@ -79,7 +79,7 @@ public void testJobs() throws InterruptedException { assertEquals("Sync task was already set", assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage()); - Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); + Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); assertEquals("Either sync or async task must be set", assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage()); job2.setTask(ec -> { @@ -117,7 +117,7 @@ public void testJobs() throws InterruptedException { @Test public void testAsyncJob() throws InterruptedException, SchedulerException { String identity = "fooAsync"; - JobDefinition asyncJob = scheduler.newJob(identity) + JobDefinition asyncJob = scheduler.newJob(identity) .setInterval("1s") .setAsyncTask(ec -> { assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext()); diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java new file mode 100644 index 0000000000000..27bcaa2104b04 --- /dev/null +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/Nonconcurrent.java @@ -0,0 +1,35 @@ +package io.quarkus.quartz; + +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; + +import io.quarkus.scheduler.Scheduled; +import io.quarkus.scheduler.SkippedExecution; + +/** + * A scheduled method annotated with this annotation may not be executed concurrently. The behavior is identical to a + * {@link Job} class annotated with {@link DisallowConcurrentExecution}. + *

+ * If {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to + * {@code false} the execution of a scheduled method is offloaded to a specific Quarkus thread pool but the triggering Quartz + * thread is blocked until the execution is finished. Therefore, make sure the Quartz thread pool is configured appropriately. + *

+ * If {@code quarkus.quartz.run-blocking-scheduled-method-on-quartz-thread} is set to {@code true} the scheduled method is + * invoked on a thread managed by Quartz. + *

+ * Unlike with {@link Scheduled.ConcurrentExecution#SKIP} the {@link SkippedExecution} event is never fired if a method + * execution is skipped by Quartz. + * + * @see DisallowConcurrentExecution + */ +@Target(METHOD) +@Retention(RUNTIME) +public @interface Nonconcurrent { + +} diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java index 395a6de8369a4..60c30ab3d7292 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/QuartzScheduler.java @@ -13,4 +13,18 @@ public interface QuartzScheduler extends Scheduler { */ org.quartz.Scheduler getScheduler(); + @Override + QuartzJobDefinition newJob(String identity); + + interface QuartzJobDefinition extends JobDefinition { + + /** + * + * @return self + * @see Nonconcurrent + */ + QuartzJobDefinition setNonconcurrent(); + + } + } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java index 9a1bd26cae449..7ea820528fcd6 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzRecorder.java @@ -1,6 +1,7 @@ package io.quarkus.quartz.runtime; import java.util.Optional; +import java.util.Set; import java.util.function.Supplier; import io.quarkus.runtime.annotations.Recorder; @@ -9,11 +10,11 @@ public class QuartzRecorder { public Supplier quartzSupportSupplier(QuartzRuntimeConfig runtimeConfig, - QuartzBuildTimeConfig buildTimeConfig, Optional driverDialect) { + QuartzBuildTimeConfig buildTimeConfig, Optional driverDialect, Set nonconcurrentMethods) { return new Supplier() { @Override public QuartzSupport get() { - return new QuartzSupport(runtimeConfig, buildTimeConfig, driverDialect); + return new QuartzSupport(runtimeConfig, buildTimeConfig, driverDialect, nonconcurrentMethods); } }; } diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java index e67a8c9f41ac9..a231c29e65b61 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSchedulerImpl.java @@ -13,9 +13,11 @@ import java.util.Properties; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -40,6 +42,7 @@ import org.jboss.logging.Logger; import org.quartz.CronScheduleBuilder; +import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobBuilder; import org.quartz.JobDetail; @@ -60,6 +63,7 @@ import org.quartz.spi.TriggerFiredBundle; import io.quarkus.arc.Subclass; +import io.quarkus.quartz.Nonconcurrent; import io.quarkus.quartz.QuartzScheduler; import io.quarkus.runtime.StartupEvent; import io.quarkus.scheduler.DelayedExecution; @@ -223,7 +227,8 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) { invoker.isBlocking() && runtimeConfig.runBlockingScheduledMethodOnQuartzThread, SchedulerUtils.parseExecutionMaxDelayAsMillis(scheduled), blockingExecutor); - JobDetail jobDetail = createJobDetail(identity, method.getInvokerClassName()); + JobDetail jobDetail = createJobBuilder(identity, method.getInvokerClassName(), + quartzSupport.isNonconcurrent(method)).build(); Optional> triggerBuilder = createTrigger(identity, scheduled, runtimeConfig, jobDetail); @@ -471,7 +476,7 @@ public Trigger getScheduledJob(String identity) { } @Override - public JobDefinition newJob(String identity) { + public QuartzJobDefinition newJob(String identity) { if (!isStarted()) { throw notStarted(); } @@ -479,7 +484,7 @@ public JobDefinition newJob(String identity) { if (scheduledTasks.containsKey(identity)) { throw new IllegalStateException("A job with this identity is already scheduled: " + identity); } - return new QuartzJobDefinition(identity); + return new QuartzJobDefinitionImpl(identity); } @Override @@ -582,13 +587,15 @@ private Properties getSchedulerConfigurationProperties(QuartzSupport quartzSuppo props.put(StdSchedulerFactory.PROP_SCHED_RMI_PROXY, "false"); props.put(StdSchedulerFactory.PROP_JOB_STORE_CLASS, buildTimeConfig.storeType.clazz); + // The org.quartz.jobStore.misfireThreshold can be used for all supported job stores + props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".misfireThreshold", + "" + runtimeConfig.misfireThreshold.toMillis()); + if (buildTimeConfig.storeType.isDbStore()) { String dataSource = buildTimeConfig.dataSourceName.orElse("QUARKUS_QUARTZ_DEFAULT_DATASOURCE"); QuarkusQuartzConnectionPoolProvider.setDataSourceName(dataSource); boolean serializeJobData = buildTimeConfig.serializeJobData.orElse(false); props.put(StdSchedulerFactory.PROP_JOB_STORE_USE_PROP, serializeJobData ? "false" : "true"); - props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".misfireThreshold", - "" + runtimeConfig.misfireThreshold.toMillis()); props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".tablePrefix", buildTimeConfig.tablePrefix); props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".dataSource", dataSource); props.put(StdSchedulerFactory.PROP_JOB_STORE_PREFIX + ".driverDelegateClass", @@ -687,13 +694,15 @@ StartMode initStartMode(SchedulerRuntimeConfig schedulerRuntimeConfig, QuartzRun } } - private JobDetail createJobDetail(String identity, String invokerClassName) { - return JobBuilder.newJob(InvokerJob.class) + private JobBuilder createJobBuilder(String identity, String invokerClassName, boolean noncurrent) { + Class jobClass = noncurrent ? NonconcurrentInvokerJob.class + : InvokerJob.class; + return JobBuilder.newJob(jobClass) // new JobKey(identity, "io.quarkus.scheduler.Scheduler") .withIdentity(identity, Scheduler.class.getName()) // this info is redundant but keep it for backward compatibility .usingJobData(INVOKER_KEY, invokerClassName) - .requestRecovery().build(); + .requestRecovery(); } /** @@ -815,12 +824,26 @@ private Optional> createTrigger(String identity, Scheduled sch return Optional.of(triggerBuilder); } - class QuartzJobDefinition extends AbstractJobDefinition implements ExecutionMetadata { + class QuartzJobDefinitionImpl extends AbstractJobDefinition + implements ExecutionMetadata, QuartzJobDefinition { + + private boolean nonconcurrent; - QuartzJobDefinition(String id) { + QuartzJobDefinitionImpl(String id) { super(id); } + @Override + public QuartzJobDefinition setNonconcurrent() { + nonconcurrent = true; + return self(); + } + + @Override + public boolean nonconcurrent() { + return nonconcurrent; + } + @Override public boolean isRunOnVirtualThread() { return runOnVirtualThread; @@ -857,7 +880,7 @@ public Class skipPredicateClass() { } @Override - public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { + public QuartzJobDefinition setSkipPredicate(SkipPredicate skipPredicate) { if (storeType.isDbStore() && skipPredicateClass == null) { throw new IllegalStateException( "A skip predicate instance cannot be scheduled programmatically if DB store type is used; register a skip predicate class instead"); @@ -866,7 +889,7 @@ public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { } @Override - public JobDefinition setTask(Consumer task, boolean runOnVirtualThread) { + public QuartzJobDefinition setTask(Consumer task, boolean runOnVirtualThread) { if (storeType.isDbStore() && taskClass == null) { throw new IllegalStateException( "A task instance cannot be scheduled programmatically if DB store type is used; register a task class instead"); @@ -875,7 +898,7 @@ public JobDefinition setTask(Consumer task, boolean runOnVir } @Override - public JobDefinition setAsyncTask(Function> asyncTask) { + public QuartzJobDefinition setAsyncTask(Function> asyncTask) { if (storeType.isDbStore() && asyncTaskClass == null) { throw new IllegalStateException( "An async task instance cannot be scheduled programmatically if DB store type is used; register an async task class instead"); @@ -912,12 +935,15 @@ interface ExecutionMetadata { SkipPredicate skipPredicate(); Class skipPredicateClass(); + + boolean nonconcurrent(); } static final String SCHEDULED_METADATA = "scheduled_metadata"; static final String EXECUTION_METADATA_TASK_CLASS = "execution_metadata_task_class"; static final String EXECUTION_METADATA_ASYNC_TASK_CLASS = "execution_metadata_async_task_class"; static final String EXECUTION_METADATA_RUN_ON_VIRTUAL_THREAD = "execution_metadata_run_on_virtual_thread"; + static final String EXECUTION_METADATA_NONCONCURRENT = "execution_metadata_nonconcurrent"; static final String EXECUTION_METADATA_SKIP_PREDICATE_CLASS = "execution_metadata_skip_predicate_class"; QuartzTrigger createJobDefinitionQuartzTrigger(ExecutionMetadata executionMetadata, SyntheticScheduled scheduled, @@ -966,11 +992,8 @@ public boolean isBlocking() { }; } - JobBuilder jobBuilder = JobBuilder.newJob(InvokerJob.class) - // new JobKey(identity, "io.quarkus.scheduler.Scheduler") - .withIdentity(scheduled.identity(), Scheduler.class.getName()) - // this info is redundant but keep it for backward compatibility - .usingJobData(INVOKER_KEY, QuartzSchedulerImpl.class.getName()); + JobBuilder jobBuilder = createJobBuilder(scheduled.identity(), QuartzSchedulerImpl.class.getName(), + executionMetadata.nonconcurrent()); if (storeType.isDbStore()) { jobBuilder.usingJobData(SCHEDULED_METADATA, scheduled.toJson()) .usingJobData(EXECUTION_METADATA_RUN_ON_VIRTUAL_THREAD, Boolean.toString(runOnVirtualThread)); @@ -1045,6 +1068,23 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) { return quartzTrigger; } + /** + * @see Nonconcurrent + */ + @DisallowConcurrentExecution + static class NonconcurrentInvokerJob extends InvokerJob { + + NonconcurrentInvokerJob(QuartzTrigger trigger, Vertx vertx) { + super(trigger, vertx); + } + + @Override + boolean awaitResult() { + return true; + } + + } + /** * Although this class is not part of the public API it must not be renamed in order to preserve backward compatibility. The * name of this class can be stored in a Quartz table in the database. See https://github.com/quarkusio/quarkus/issues/29177 @@ -1060,11 +1100,24 @@ static class InvokerJob implements Job { this.vertx = vertx; } + boolean awaitResult() { + return false; + } + @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { if (trigger != null && trigger.invoker != null) { // could be null from previous runs try { - trigger.invoker.invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); + CompletionStage ret = trigger.invoker + .invoke(new QuartzScheduledExecution(trigger, jobExecutionContext)); + if (awaitResult()) { + try { + ret.toCompletableFuture().get(); + } catch (ExecutionException | CancellationException e) { + LOGGER.warnf("Unable to retrieve result for job %s: %s", + jobExecutionContext.getJobDetail().getKey().getName(), e.toString()); + } + } } catch (Exception e) { // already logged by the StatusEmitterInvoker } @@ -1190,6 +1243,9 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr // This is a job backed by a @Scheduled method or a JobDefinition return new InvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()), vertx); } + if (jobClass.equals(NonconcurrentInvokerJob.class)) { + return new NonconcurrentInvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()), vertx); + } if (Subclass.class.isAssignableFrom(jobClass)) { // Get the original class from an intercepted bean class jobClass = (Class) jobClass.getSuperclass(); @@ -1218,6 +1274,7 @@ static class SerializedExecutionMetadata implements ExecutionMetadata { private final Class>> asyncTaskClass; private final boolean runOnVirtualThread; private final Class skipPredicateClass; + private final boolean nonconcurrent; @SuppressWarnings("unchecked") public SerializedExecutionMetadata(JobDetail jobDetail) { @@ -1249,6 +1306,7 @@ public SerializedExecutionMetadata(JobDetail jobDetail) { } this.runOnVirtualThread = Boolean .parseBoolean(jobDetail.getJobDataMap().getString(EXECUTION_METADATA_RUN_ON_VIRTUAL_THREAD)); + this.nonconcurrent = Boolean.parseBoolean(jobDetail.getJobDataMap().getString(EXECUTION_METADATA_NONCONCURRENT)); } @Override @@ -1271,6 +1329,11 @@ public Class>> asyncTaskClass() return asyncTaskClass; } + @Override + public boolean nonconcurrent() { + return nonconcurrent; + } + @Override public boolean isRunOnVirtualThread() { return runOnVirtualThread; diff --git a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java index b343422373b78..18944bba97041 100644 --- a/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java +++ b/extensions/quartz/runtime/src/main/java/io/quarkus/quartz/runtime/QuartzSupport.java @@ -1,18 +1,25 @@ package io.quarkus.quartz.runtime; import java.util.Optional; +import java.util.Set; + +import io.quarkus.quartz.Nonconcurrent; +import io.quarkus.scheduler.common.runtime.ScheduledMethod; public class QuartzSupport { private final QuartzRuntimeConfig runtimeConfig; private final QuartzBuildTimeConfig buildTimeConfig; private final Optional driverDialect; + // # + private final Set nonconcurrentMethods; public QuartzSupport(QuartzRuntimeConfig runtimeConfig, QuartzBuildTimeConfig buildTimeConfig, - Optional driverDialect) { + Optional driverDialect, Set nonconcurrentMethods) { this.runtimeConfig = runtimeConfig; this.buildTimeConfig = buildTimeConfig; this.driverDialect = driverDialect; + this.nonconcurrentMethods = Set.copyOf(nonconcurrentMethods); } public QuartzRuntimeConfig getRuntimeConfig() { @@ -26,4 +33,14 @@ public QuartzBuildTimeConfig getBuildTimeConfig() { public Optional getDriverDialect() { return driverDialect; } + + /** + * + * @param method + * @return {@code true} if the scheduled method is annotated with {@link Nonconcurrent} + */ + public boolean isNonconcurrent(ScheduledMethod method) { + return nonconcurrentMethods.contains(method.getMethodDescription()); + } + } diff --git a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java index d0d2467c160cc..97dbebb4014c5 100644 --- a/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java +++ b/extensions/scheduler/api/src/main/java/io/quarkus/scheduler/Scheduler.java @@ -103,7 +103,7 @@ public interface Scheduler { * @see Scheduled#identity() * @throws UnsupportedOperationException If the scheduler was not started */ - JobDefinition newJob(String identity); + JobDefinition newJob(String identity); /** * Removes the job previously added via {@link #newJob(String)}. @@ -130,7 +130,7 @@ public interface Scheduler { *

* The implementation is not thread-safe and should not be reused. */ - interface JobDefinition { + interface JobDefinition> { /** * The schedule is defined either by {@link #setCron(String)} or by {@link #setInterval(String)}. If both methods are @@ -142,7 +142,7 @@ interface JobDefinition { * @return self * @see Scheduled#cron() */ - JobDefinition setCron(String cron); + THIS setCron(String cron); /** * The schedule is defined either by {@link #setCron(String)} or by {@link #setInterval(String)}. If both methods are @@ -157,7 +157,7 @@ interface JobDefinition { * @return self * @see Scheduled#every() */ - JobDefinition setInterval(String every); + THIS setInterval(String every); /** * {@link Scheduled#delayed()} @@ -166,7 +166,7 @@ interface JobDefinition { * @return self * @see Scheduled#delayed() */ - JobDefinition setDelayed(String period); + THIS setDelayed(String period); /** * {@link Scheduled#concurrentExecution()} @@ -175,7 +175,7 @@ interface JobDefinition { * @return self * @see Scheduled#concurrentExecution() */ - JobDefinition setConcurrentExecution(ConcurrentExecution concurrentExecution); + THIS setConcurrentExecution(ConcurrentExecution concurrentExecution); /** * {@link Scheduled#skipExecutionIf()} @@ -184,7 +184,7 @@ interface JobDefinition { * @return self * @see Scheduled#skipExecutionIf() */ - JobDefinition setSkipPredicate(SkipPredicate skipPredicate); + THIS setSkipPredicate(SkipPredicate skipPredicate); /** * {@link Scheduled#skipExecutionIf()} @@ -193,7 +193,7 @@ interface JobDefinition { * @return self * @see Scheduled#skipExecutionIf() */ - JobDefinition setSkipPredicate(Class skipPredicateClass); + THIS setSkipPredicate(Class skipPredicateClass); /** * {@link Scheduled#overdueGracePeriod()} @@ -202,7 +202,7 @@ interface JobDefinition { * @return self * @see Scheduled#overdueGracePeriod() */ - JobDefinition setOverdueGracePeriod(String period); + THIS setOverdueGracePeriod(String period); /** * {@link Scheduled#timeZone()} @@ -210,7 +210,7 @@ interface JobDefinition { * @return self * @see Scheduled#timeZone() */ - JobDefinition setTimeZone(String timeZone); + THIS setTimeZone(String timeZone); /** * {@link Scheduled#executeWith()} @@ -220,7 +220,7 @@ interface JobDefinition { * @throws IllegalArgumentException If the composite scheduler is used and the selected implementation is not available * @see Scheduled#executeWith() */ - JobDefinition setExecuteWith(String implementation); + THIS setExecuteWith(String implementation); /** * {@link Scheduled#executionMaxDelay()} @@ -229,14 +229,14 @@ interface JobDefinition { * @return self * @see Scheduled#executionMaxDelay() */ - JobDefinition setExecutionMaxDelay(String maxDelay); + THIS setExecutionMaxDelay(String maxDelay); /** * * @param task * @return self */ - default JobDefinition setTask(Consumer task) { + default THIS setTask(Consumer task) { return setTask(task, false); } @@ -256,7 +256,7 @@ default JobDefinition setTask(Consumer task) { * @param taskClass * @return self */ - default JobDefinition setTask(Class> taskClass) { + default THIS setTask(Class> taskClass) { return setTask(taskClass, false); } @@ -267,7 +267,7 @@ default JobDefinition setTask(Class> task * @param runOnVirtualThread whether the task must be run on a virtual thread if the JVM allows it. * @return self */ - JobDefinition setTask(Consumer task, boolean runOnVirtualThread); + THIS setTask(Consumer task, boolean runOnVirtualThread); /** * The class must either represent a CDI bean or declare a public no-args constructor. @@ -286,14 +286,14 @@ default JobDefinition setTask(Class> task * @param runOnVirtualThread * @return self */ - JobDefinition setTask(Class> consumerClass, boolean runOnVirtualThread); + THIS setTask(Class> consumerClass, boolean runOnVirtualThread); /** * * @param asyncTask * @return self */ - JobDefinition setAsyncTask(Function> asyncTask); + THIS setAsyncTask(Function> asyncTask); /** * The class must either represent a CDI bean or declare a public no-args constructor. @@ -311,7 +311,7 @@ default JobDefinition setTask(Class> task * @param asyncTaskClass * @return self */ - JobDefinition setAsyncTask(Class>> asyncTaskClass); + THIS setAsyncTask(Class>> asyncTaskClass); /** * Attempts to schedule the job. diff --git a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java index d94f1c612a378..d7a391628b7dd 100644 --- a/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java +++ b/extensions/scheduler/common/src/main/java/io/quarkus/scheduler/common/runtime/AbstractJobDefinition.java @@ -12,7 +12,7 @@ import io.quarkus.scheduler.common.runtime.util.SchedulerUtils; import io.smallrye.mutiny.Uni; -public abstract class AbstractJobDefinition implements JobDefinition { +public abstract class AbstractJobDefinition> implements JobDefinition { protected final String identity; protected String cron = ""; @@ -37,104 +37,104 @@ public AbstractJobDefinition(String identity) { } @Override - public JobDefinition setCron(String cron) { + public THIS setCron(String cron) { checkScheduled(); this.cron = Objects.requireNonNull(cron); - return this; + return self(); } @Override - public JobDefinition setInterval(String every) { + public THIS setInterval(String every) { checkScheduled(); this.every = Objects.requireNonNull(every); - return this; + return self(); } @Override - public JobDefinition setDelayed(String period) { + public THIS setDelayed(String period) { checkScheduled(); this.delayed = Objects.requireNonNull(period); - return this; + return self(); } @Override - public JobDefinition setConcurrentExecution(ConcurrentExecution concurrentExecution) { + public THIS setConcurrentExecution(ConcurrentExecution concurrentExecution) { checkScheduled(); this.concurrentExecution = Objects.requireNonNull(concurrentExecution); - return this; + return self(); } @Override - public JobDefinition setSkipPredicate(SkipPredicate skipPredicate) { + public THIS setSkipPredicate(SkipPredicate skipPredicate) { checkScheduled(); this.skipPredicate = Objects.requireNonNull(skipPredicate); - return this; + return self(); } @Override - public JobDefinition setSkipPredicate(Class skipPredicateClass) { + public THIS setSkipPredicate(Class skipPredicateClass) { checkScheduled(); this.skipPredicateClass = Objects.requireNonNull(skipPredicateClass); return setSkipPredicate(SchedulerUtils.instantiateBeanOrClass(skipPredicateClass)); } @Override - public JobDefinition setOverdueGracePeriod(String period) { + public THIS setOverdueGracePeriod(String period) { checkScheduled(); this.overdueGracePeriod = Objects.requireNonNull(period); - return this; + return self(); } @Override - public JobDefinition setTimeZone(String timeZone) { + public THIS setTimeZone(String timeZone) { checkScheduled(); this.timeZone = Objects.requireNonNull(timeZone); - return this; + return self(); } @Override - public JobDefinition setExecuteWith(String implementation) { + public THIS setExecuteWith(String implementation) { checkScheduled(); this.implementation = Objects.requireNonNull(implementation); - return this; + return self(); } @Override - public JobDefinition setExecutionMaxDelay(String maxDelay) { + public THIS setExecutionMaxDelay(String maxDelay) { checkScheduled(); this.executionMaxDelay = maxDelay; - return this; + return self(); } @Override - public JobDefinition setTask(Consumer task, boolean runOnVirtualThread) { + public THIS setTask(Consumer task, boolean runOnVirtualThread) { checkScheduled(); if (asyncTask != null) { throw new IllegalStateException("Async task was already set"); } this.task = Objects.requireNonNull(task); this.runOnVirtualThread = runOnVirtualThread; - return this; + return self(); } @Override - public JobDefinition setTask(Class> taskClass, boolean runOnVirtualThread) { + public THIS setTask(Class> taskClass, boolean runOnVirtualThread) { this.taskClass = Objects.requireNonNull(taskClass); return setTask(SchedulerUtils.instantiateBeanOrClass(taskClass), runOnVirtualThread); } @Override - public JobDefinition setAsyncTask(Function> asyncTask) { + public THIS setAsyncTask(Function> asyncTask) { checkScheduled(); if (task != null) { throw new IllegalStateException("Sync task was already set"); } this.asyncTask = Objects.requireNonNull(asyncTask); - return this; + return self(); } @Override - public JobDefinition setAsyncTask(Class>> asyncTaskClass) { + public THIS setAsyncTask(Class>> asyncTaskClass) { this.asyncTaskClass = Objects.requireNonNull(asyncTaskClass); return setAsyncTask(SchedulerUtils.instantiateBeanOrClass(asyncTaskClass)); } @@ -145,4 +145,9 @@ protected void checkScheduled() { } } + @SuppressWarnings("unchecked") + protected THIS self() { + return (THIS) this; + } + } diff --git a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java index 3bd4446a7a44f..cd8910628bd53 100644 --- a/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java +++ b/extensions/scheduler/deployment/src/test/java/io/quarkus/scheduler/test/programmatic/ProgrammaticJobsTest.java @@ -63,7 +63,7 @@ public void testJobs() throws InterruptedException { .setSkipPredicate(AlwaysSkipPredicate.class) .schedule(); - Scheduler.JobDefinition job1 = scheduler.newJob("foo") + Scheduler.JobDefinition job1 = scheduler.newJob("foo") .setInterval("1s") .setTask(ec -> { assertTrue(Arc.container().requestContext().isActive()); @@ -73,7 +73,7 @@ public void testJobs() throws InterruptedException { assertEquals("Sync task was already set", assertThrows(IllegalStateException.class, () -> job1.setAsyncTask(ec -> null)).getMessage()); - Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); + Scheduler.JobDefinition job2 = scheduler.newJob("foo").setCron("0/5 * * * * ?"); assertEquals("Either sync or async task must be set", assertThrows(IllegalStateException.class, () -> job2.schedule()).getMessage()); job2.setTask(ec -> { @@ -110,7 +110,7 @@ public void testJobs() throws InterruptedException { @Test public void testAsyncJob() throws InterruptedException { - JobDefinition asyncJob = scheduler.newJob("fooAsync") + JobDefinition asyncJob = scheduler.newJob("fooAsync") .setInterval("1s") .setAsyncTask(ec -> { assertTrue(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext()); diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java index 3832f3a1fb436..c41f17a47f9f0 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/CompositeScheduler.java @@ -113,7 +113,7 @@ public Trigger getScheduledJob(String identity) { } @Override - public JobDefinition newJob(String identity) { + public CompositeJobDefinition newJob(String identity) { return new CompositeJobDefinition(identity); } @@ -133,14 +133,14 @@ public String implementation() { return Scheduled.AUTO; } - class CompositeJobDefinition extends AbstractJobDefinition { + public class CompositeJobDefinition extends AbstractJobDefinition { public CompositeJobDefinition(String identity) { super(identity); } @Override - public JobDefinition setExecuteWith(String implementation) { + public CompositeJobDefinition setExecuteWith(String implementation) { Objects.requireNonNull(implementation); if (!Scheduled.AUTO.equals(implementation)) { if (schedulers.stream().map(Scheduler::implementation).noneMatch(implementation::equals)) { @@ -164,7 +164,7 @@ public Trigger schedule() { throw new IllegalStateException("Matching scheduler implementation not found: " + implementation); } - private JobDefinition copy(JobDefinition to) { + private JobDefinition copy(JobDefinition to) { to.setCron(cron); to.setInterval(every); to.setDelayed(delayed); diff --git a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java index 595a23e9404aa..50950d516e006 100644 --- a/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java +++ b/extensions/scheduler/runtime/src/main/java/io/quarkus/scheduler/runtime/SimpleScheduler.java @@ -174,7 +174,7 @@ public String implementation() { } @Override - public JobDefinition newJob(String identity) { + public SimpleJobDefinition newJob(String identity) { if (!isStarted()) { throw notStarted(); } @@ -603,7 +603,7 @@ public Instant getScheduledFireTime() { } - class SimpleJobDefinition extends AbstractJobDefinition { + public class SimpleJobDefinition extends AbstractJobDefinition { private final SchedulerConfig schedulerConfig;