Skip to content

Commit

Permalink
Scheduler: run multiple scheduler implementations
Browse files Browse the repository at this point in the history
- related to quarkusio#41954
  • Loading branch information
mkouba committed Jul 31, 2024
1 parent a02d772 commit d4bd15a
Show file tree
Hide file tree
Showing 22 changed files with 904 additions and 34 deletions.
26 changes: 25 additions & 1 deletion docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ For example `"0 15 10 * * ?"` fires at 10:15am every day.
[source,java]
----
@Scheduled(cron = "0 15 10 * * ?")
void fireAt10AmEveryDay() { }
void fireAt1015AmEveryDay() { }
----

The syntax used in CRON expressions is controlled by the `quarkus.scheduler.cron-type` property.
Expand Down Expand Up @@ -359,6 +359,30 @@ class Jobs {
----
<1> The return type `Uni<Void>` instructs the scheduler to execute the method on the Vert.x event loop.

=== How to use multiple scheduler implementations

In some cases, it might be useful to choose a scheduler implementation used to execute a scheduled method.
However, only one `Scheduler` implementation is used for all scheduled methods by default.
For example, the `quarkus-quartz` extension provides an implementation that supports clustering but it also removes the simple in-memory implementation from the game.
Now, if clustering is enabled then it's not possible to define a scheduled method that would be executed locally on a single node.
Nevertheless, if you set the `quarkus.scheduler.use-composite-scheduler` config property to `true` then a composite `Scheduler` is used instead.
This means that multiple scheduler implementations are kept running side by side.
Furthermore, it's possible to chose a specific implementation used to execute a scheduled method using `@Scheduled#executeWith()`.

[source,java]
----
class Jobs {
@Scheduled(cron = "0 15 10 * * ?") <1>
void fireAt10AmEveryDay() { }
@Scheduled(every = "1s", executeWith = Scheduled.SIMPLE) <2>
void everySecond() { }
}
----
<1> If the `quarkus-quartz` extension is present then this method will be executed with the Quartz-specific scheduler.
<2> If `quarkus.scheduler.use-composite-scheduler=true` is set then this method will be executed with the simple in-memory implementation provided by the `quarkus-scheduler` extension.

== Scheduler

Quarkus provides a built-in bean of type `io.quarkus.scheduler.Scheduler` that can be injected and used to pause/resume the scheduler and individual scheduled methods identified by a specific `Scheduled#identity()`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@
import io.quarkus.quartz.runtime.jdbc.QuarkusPostgreSQLDelegate;
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;

/**
*
*/
public class QuartzProcessor {

private static final DotName JOB = DotName.createSimple(Job.class.getName());
Expand All @@ -77,6 +76,11 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.QUARTZ);
}

@BuildStep
SchedulerImplementationBuildItem implementation() {
return new SchedulerImplementationBuildItem(Scheduled.QUARTZ, DotName.createSimple(QuartzSchedulerImpl.class), 1);
}

@BuildStep
AdditionalBeanBuildItem beans() {
return new AdditionalBeanBuildItem(QuartzSchedulerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.quarkus.quartz.test.composite;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Identifier;

public class CompositeJobDefinitionTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> {
})
.overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true")
.overrideConfigKey("quarkus.scheduler.start-mode", "forced");

@Constituent
QuartzScheduler quartz;

@Constituent
@Identifier("SIMPLE")
Scheduler simple;

@Inject
Scheduler composite;

static CountDownLatch simpleLatch = new CountDownLatch(1);
static CountDownLatch quartzLatch = new CountDownLatch(1);
static CountDownLatch autoLatch = new CountDownLatch(1);

static void reset() {
simpleLatch = new CountDownLatch(1);
quartzLatch = new CountDownLatch(1);
autoLatch = new CountDownLatch(1);
}

@Test
public void testExecution() throws InterruptedException {

assertEquals("Scheduler implementation not available: bar",
assertThrows(IllegalArgumentException.class, () -> composite.newJob("foo").setExecuteWith("bar")).getMessage());

composite.newJob("simple")
.setInterval("1s")
.setExecuteWith(Scheduled.SIMPLE)
.setTask(se -> {
simpleLatch.countDown();
}).schedule();

composite.newJob("quartz")
.setInterval("1s")
.setExecuteWith(Scheduled.QUARTZ)
.setTask(se -> {
quartzLatch.countDown();
}).schedule();

composite.newJob("auto")
.setInterval("1s")
.setTask(se -> {
autoLatch.countDown();
}).schedule();

assertTrue(simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(autoLatch.await(5, TimeUnit.SECONDS));

assertNull(quartz.getScheduledJob("simple"));
assertNotNull(quartz.getScheduledJob("quartz"));
assertNotNull(quartz.getScheduledJob("auto"));

assertNotNull(simple.getScheduledJob("simple"));
assertNull(simple.getScheduledJob("quartz"));
assertNull(simple.getScheduledJob("auto"));

assertNotNull(composite.getScheduledJob("quartz"));
assertNotNull(composite.getScheduledJob("auto"));
assertNotNull(composite.getScheduledJob("simple"));

composite.pause();
reset();
assertFalse(composite.isRunning());
assertFalse(simpleLatch.await(2, TimeUnit.SECONDS));

composite.resume();
assertTrue(composite.isRunning());
assertTrue(simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(autoLatch.await(5, TimeUnit.SECONDS));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.quartz.test.composite;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class CompositeSchedulerNotUsedTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.assertException(t -> {
assertThat(t).cause().isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"The required scheduler implementation is not available because the composite scheduler is not used: SIMPLE");
});

@Test
public void test() {
fail();
}

static class Jobs {

@Scheduled(every = "1s", executeWith = Scheduled.SIMPLE)
void quartz() {
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.quarkus.quartz.test.composite;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Identifier;

public class CompositeSchedulerTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true");

@Constituent
QuartzScheduler quartz;

@Constituent
@Identifier("SIMPLE")
Scheduler simple;

@Inject
Scheduler composite;

@Test
public void testExecution() throws InterruptedException {
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));

assertNull(quartz.getScheduledJob("simple"));
assertNotNull(quartz.getScheduledJob("quartz"));
assertNotNull(quartz.getScheduledJob("auto"));

assertNotNull(simple.getScheduledJob("simple"));
assertNull(simple.getScheduledJob("quartz"));
assertNull(simple.getScheduledJob("auto"));

assertNotNull(composite.getScheduledJob("quartz"));
assertNotNull(composite.getScheduledJob("auto"));
assertNotNull(composite.getScheduledJob("simple"));

composite.pause();
Jobs.reset();
assertFalse(composite.isRunning());
assertFalse(Jobs.simpleLatch.await(2, TimeUnit.SECONDS));

composite.resume();
assertTrue(composite.isRunning());
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));
}

static class Jobs {

static CountDownLatch simpleLatch = new CountDownLatch(1);
static CountDownLatch quartzLatch = new CountDownLatch(1);
static CountDownLatch autoLatch = new CountDownLatch(1);

static void reset() {
simpleLatch = new CountDownLatch(1);
quartzLatch = new CountDownLatch(1);
autoLatch = new CountDownLatch(1);
}

@Scheduled(identity = "simple", every = "1s", executeWith = Scheduled.SIMPLE)
void simple() {
simpleLatch.countDown();
}

@Scheduled(identity = "quartz", every = "1s", executeWith = Scheduled.QUARTZ)
void quartz() {
quartzLatch.countDown();
}

@Scheduled(identity = "auto", every = "1s")
void auto() {
autoLatch.countDown();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
} else if (!forceStart && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) {
} else if (!forceStart && context.getScheduledMethods(Scheduled.QUARTZ).isEmpty()
&& !context.forceSchedulerStart()) {
LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started");
this.scheduler = null;
} else {
Expand Down Expand Up @@ -232,10 +233,13 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) {
}
};

for (ScheduledMethod method : context.getScheduledMethods()) {
for (ScheduledMethod method : context.getScheduledMethods(Scheduled.QUARTZ)) {
int nameSequence = 0;

for (Scheduled scheduled : method.getSchedules()) {
if (!context.matchesImplementation(scheduled, Scheduled.QUARTZ)) {
continue;
}
String identity = SchedulerUtils.lookUpPropertyValue(scheduled.identity());
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
Expand Down Expand Up @@ -345,6 +349,11 @@ public org.quartz.Scheduler getScheduler() {
return scheduler;
}

@Override
public String implementation() {
return Scheduled.QUARTZ;
}

@Override
public void pause() {
if (!enabled) {
Expand Down Expand Up @@ -893,7 +902,7 @@ public Trigger schedule() {
}
scheduled = true;
SyntheticScheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation);
return createJobDefinitionQuartzTrigger(this, scheduled, null);
}

Expand Down
Loading

0 comments on commit d4bd15a

Please sign in to comment.