Skip to content

Commit

Permalink
Scheduler: run multiple scheduler implementations
Browse files Browse the repository at this point in the history
- related to quarkusio#41954
  • Loading branch information
mkouba committed Jul 30, 2024
1 parent a02d772 commit b116e51
Show file tree
Hide file tree
Showing 21 changed files with 757 additions and 22 deletions.
26 changes: 25 additions & 1 deletion docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ For example `"0 15 10 * * ?"` fires at 10:15am every day.
[source,java]
----
@Scheduled(cron = "0 15 10 * * ?")
void fireAt10AmEveryDay() { }
void fireAt1015AmEveryDay() { }
----

The syntax used in CRON expressions is controlled by the `quarkus.scheduler.cron-type` property.
Expand Down Expand Up @@ -359,6 +359,30 @@ class Jobs {
----
<1> The return type `Uni<Void>` instructs the scheduler to execute the method on the Vert.x event loop.

=== How to use multiple scheduler implementations

In some cases, it might be useful to choose a scheduler implementation used to execute a scheduled method.
However, only one `Scheduler` implementation is used for all scheduled methods by default.
For example, the `quarkus-quartz` extension provides an implementation that supports clustering but it also removes the simple in-memory implementation from the game.
Now, if clustering is enabled then it's not possible to define a scheduled method that would be executed locally on a single node.
Nevertheless, if you set the `quarkus.scheduler.use-composite-scheduler` config property to `true` then a composite `Scheduler` is used instead.
This means that multiple scheduler implementations are kept running side by side.
Furthermore, it's possible to chose a specific implementation used to execute a scheduled method using `@Scheduled#executeWith()`.

[source,java]
----
class Jobs {
@Scheduled(cron = "0 15 10 * * ?") <1>
void fireAt10AmEveryDay() { }
@Scheduled(every = "1s", executeWith = Scheduled.SIMPLE) <2>
void everySecond() { }
}
----
<1> If the `quarkus-quartz` extension is present then this method will be executed with the Quartz-specific scheduler.
<2> If `quarkus.scheduler.use-composite-scheduler=true` is set then this method will be executed with the simple in-memory implementation provided by the `quarkus-scheduler` extension.

== Scheduler

Quarkus provides a built-in bean of type `io.quarkus.scheduler.Scheduler` that can be injected and used to pause/resume the scheduler and individual scheduled methods identified by a specific `Scheduled#identity()`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@
import io.quarkus.quartz.runtime.jdbc.QuarkusPostgreSQLDelegate;
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;

/**
*
*/
public class QuartzProcessor {

private static final DotName JOB = DotName.createSimple(Job.class.getName());
Expand All @@ -77,6 +76,11 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.QUARTZ);
}

@BuildStep
SchedulerImplementationBuildItem implementation() {
return new SchedulerImplementationBuildItem(Scheduled.QUARTZ, DotName.createSimple(QuartzSchedulerImpl.class), 1);
}

@BuildStep
AdditionalBeanBuildItem beans() {
return new AdditionalBeanBuildItem(QuartzSchedulerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.quartz.test.composite;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class CompositeSchedulerNotUsedTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.assertException(t -> {
assertThat(t).cause().isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"The required scheduler implementation is not available because the composite scheduler is not used: SIMPLE");
});

@Test
public void test() {
fail();
}

static class Jobs {

@Scheduled(every = "1s", executeWith = Scheduled.SIMPLE)
void quartz() {
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.quarkus.quartz.test.composite;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Identifier;

public class CompositeSchedulerTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true");

@Constituent
QuartzScheduler quartz;

@Constituent
@Identifier("SIMPLE")
Scheduler simple;

@Inject
Scheduler composite;

@Test
public void testExecution() throws InterruptedException {
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));

assertNull(quartz.getScheduledJob("simple"));
assertNotNull(quartz.getScheduledJob("quartz"));
assertNotNull(quartz.getScheduledJob("auto"));

assertNotNull(simple.getScheduledJob("simple"));
assertNull(simple.getScheduledJob("quartz"));
assertNull(simple.getScheduledJob("auto"));

assertNotNull(composite.getScheduledJob("quartz"));
assertNotNull(composite.getScheduledJob("auto"));
assertNotNull(composite.getScheduledJob("simple"));

composite.pause();
Jobs.reset();
assertFalse(composite.isRunning());
assertFalse(Jobs.simpleLatch.await(2, TimeUnit.SECONDS));

composite.resume();
assertTrue(composite.isRunning());
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));
}

static class Jobs {

static CountDownLatch simpleLatch = new CountDownLatch(1);
static CountDownLatch quartzLatch = new CountDownLatch(1);
static CountDownLatch autoLatch = new CountDownLatch(1);

static void reset() {
simpleLatch = new CountDownLatch(1);
quartzLatch = new CountDownLatch(1);
autoLatch = new CountDownLatch(1);
}

@Scheduled(identity = "simple", every = "1s", executeWith = Scheduled.SIMPLE)
void simple() {
simpleLatch.countDown();
}

@Scheduled(identity = "quartz", every = "1s", executeWith = Scheduled.QUARTZ)
void quartz() {
quartzLatch.countDown();
}

@Scheduled(identity = "auto", every = "1s")
void auto() {
autoLatch.countDown();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
} else if (!forceStart && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) {
} else if (!forceStart && context.getScheduledMethods(Scheduled.QUARTZ).isEmpty()
&& !context.forceSchedulerStart()) {
LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started");
this.scheduler = null;
} else {
Expand Down Expand Up @@ -232,10 +233,13 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) {
}
};

for (ScheduledMethod method : context.getScheduledMethods()) {
for (ScheduledMethod method : context.getScheduledMethods(Scheduled.QUARTZ)) {
int nameSequence = 0;

for (Scheduled scheduled : method.getSchedules()) {
if (!context.matchesImplementation(scheduled, Scheduled.QUARTZ)) {
continue;
}
String identity = SchedulerUtils.lookUpPropertyValue(scheduled.identity());
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
Expand Down Expand Up @@ -345,6 +349,11 @@ public org.quartz.Scheduler getScheduler() {
return scheduler;
}

@Override
public String implementation() {
return Scheduled.QUARTZ;
}

@Override
public void pause() {
if (!enabled) {
Expand Down Expand Up @@ -893,7 +902,7 @@ public Trigger schedule() {
}
scheduled = true;
SyntheticScheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation);
return createJobDefinitionQuartzTrigger(this, scheduled, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@
*/
String DEFAULT_TIMEZONE = "<<default timezone>>";

/**
* Constant value for {@link #executeWith()} indicating that the implementation should be selected automatically, i.e. the
* implementation with highest priority is used.
*/
String AUTO = "<<auto>>";

/**
* Constant value for {@link #executeWith()} indicating that the simple in-memory implementation provided by the
* {@code quarkus-scheduler} extension should be used.
* <p>
* This implementation has priority {@code 0}.
*/
String SIMPLE = "SIMPLE";

/**
* Constant value for {@link #executeWith()} indicating that the Quartz implementation provided by the
* {@code quarkus-quartz} extension should be used.
* <p>
* This implementation has priority {@code 1}.
*/
String QUARTZ = "QUARTZ";

/**
* Optionally defines a unique identifier for this job.
* <p>
Expand Down Expand Up @@ -205,6 +227,30 @@
*/
String timeZone() default DEFAULT_TIMEZONE;

/**
* Choose a scheduler implementation used to execute a scheduled method.
* <p>
* Only one scheduler implementation is used for all scheduled methods by default. For example, the {@code quarkus-quartz}
* extension provides an implementation that supports clustering but it also removes the simple in-memory implementation
* from the game.
* <p>
* If the {@code quarkus.scheduler.use-composite-scheduler} config property is set to {@code true} then a composite
* scheduler is used instead. This means that multiple scheduler implementations are kept running side by side.
* In this case, it's possible to choose a specific implementation used to execute a scheduled method. By default, the
* implementation with highest priority is selected automatically.
* <p>
* If the {@code quarkus.scheduler.use-composite-scheduler} config property is set to {@code false} (default) and the
* required implementation is not the implementation with highest priority, then the build fails.
* <p>
* In any case, if the required implementation is not available, then the build fails.
*
* @return the implementation to execute this scheduled method
* @see #AUTO
* @see #SIMPLE
* @see #QUARTZ
*/
String executeWith() default AUTO;

@Retention(RUNTIME)
@Target(METHOD)
@interface Schedules {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ public interface Scheduler {

/**
* Identity must not be null and {@code false} is returned for non-existent identity.
* <p>
* Note that this method only returns {@code true} if the job was explicitly paused. I.e. it does not reflect a paused
* scheduler.
*
* @param identity
* @return {@code true} if the job with the given identity is paused, {@code false} otherwise
* @see Scheduled#identity()
* @see #pause(String)
*/
boolean isPaused(String identity);

Expand Down Expand Up @@ -88,6 +92,13 @@ public interface Scheduler {
*/
Trigger unscheduleJob(String identity);

/**
*
* @return the implementation
* @see Scheduled#executeWith()
*/
String implementation();

/**
* The job definition is a builder-like API that can be used to define a job programmatically.
* <p>
Expand Down Expand Up @@ -177,6 +188,16 @@ interface JobDefinition {
*/
JobDefinition setTimeZone(String timeZone);

/**
* {@link Scheduled#executeWith()}
*
* @param implementation
* @return self
* @throws IllegalArgumentException If the selected implementation is not available
* @see Scheduled#executeWith()
*/
JobDefinition setExecuteWith(String implementation);

/**
*
* @param task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractJobDefinition implements JobDefinition {
protected boolean scheduled = false;
protected String timeZone = Scheduled.DEFAULT_TIMEZONE;
protected boolean runOnVirtualThread;
protected String implementation = Scheduled.AUTO;

public AbstractJobDefinition(String identity) {
this.identity = identity;
Expand Down Expand Up @@ -88,6 +89,12 @@ public JobDefinition setTimeZone(String timeZone) {
return this;
}

@Override
public JobDefinition setExecuteWith(String implementation) {
this.implementation = implementation;
return this;
}

@Override
public JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread) {
checkScheduled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.cronutils.model.CronType;

import io.quarkus.scheduler.Scheduled;

public interface SchedulerContext {

CronType getCronType();
Expand All @@ -13,6 +15,10 @@ public interface SchedulerContext {

boolean forceSchedulerStart();

List<ScheduledMethod> getScheduledMethods(String implementation);

boolean matchesImplementation(Scheduled scheduled, String implementation);

@SuppressWarnings("unchecked")
default ScheduledInvoker createInvoker(String invokerClassName) {
try {
Expand Down
Loading

0 comments on commit b116e51

Please sign in to comment.