Skip to content

Commit

Permalink
Scheduler: run multiple scheduler implementations
Browse files Browse the repository at this point in the history
- related to #41954
  • Loading branch information
mkouba committed Jul 30, 2024
1 parent a02d772 commit 9a053dc
Show file tree
Hide file tree
Showing 20 changed files with 722 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@
import io.quarkus.quartz.runtime.jdbc.QuarkusPostgreSQLDelegate;
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;

/**
*
*/
public class QuartzProcessor {

private static final DotName JOB = DotName.createSimple(Job.class.getName());
Expand All @@ -77,6 +76,11 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.QUARTZ);
}

@BuildStep
SchedulerImplementationBuildItem implementation() {
return new SchedulerImplementationBuildItem(Scheduled.QUARTZ, DotName.createSimple(QuartzSchedulerImpl.class), 1);
}

@BuildStep
AdditionalBeanBuildItem beans() {
return new AdditionalBeanBuildItem(QuartzSchedulerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.quarkus.quartz.test.composite;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.test.QuarkusUnitTest;

public class CompositeSchedulerNotUsedTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.assertException(t -> {
assertThat(t).cause().isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"The required scheduler implementation is not available because the composite scheduler is not used: SIMPLE");
});

@Test
public void test() {
fail();
}

static class Jobs {

@Scheduled(every = "1s", executeWith = Scheduled.SIMPLE)
void quartz() {
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.quarkus.quartz.test.composite;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Identifier;

public class CompositeSchedulerTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true");

@Constituent
QuartzScheduler quartz;

@Constituent
@Identifier("SIMPLE")
Scheduler simple;

@Inject
Scheduler composite;

@Test
public void testExecution() throws InterruptedException {
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));

assertNull(quartz.getScheduledJob("simple"));
assertNotNull(quartz.getScheduledJob("quartz"));
assertNotNull(quartz.getScheduledJob("auto"));

assertNotNull(simple.getScheduledJob("simple"));
assertNull(simple.getScheduledJob("quartz"));
assertNull(simple.getScheduledJob("auto"));

assertNotNull(composite.getScheduledJob("quartz"));
assertNotNull(composite.getScheduledJob("auto"));
assertNotNull(composite.getScheduledJob("simple"));

composite.pause();
Jobs.reset();
assertFalse(composite.isRunning());
assertFalse(Jobs.simpleLatch.await(2, TimeUnit.SECONDS));

composite.resume();
assertTrue(composite.isRunning());
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));
}

static class Jobs {

static CountDownLatch simpleLatch = new CountDownLatch(1);
static CountDownLatch quartzLatch = new CountDownLatch(1);
static CountDownLatch autoLatch = new CountDownLatch(1);

static void reset() {
simpleLatch = new CountDownLatch(1);
quartzLatch = new CountDownLatch(1);
autoLatch = new CountDownLatch(1);
}

@Scheduled(identity = "simple", every = "1s", executeWith = Scheduled.SIMPLE)
void simple() {
simpleLatch.countDown();
}

@Scheduled(identity = "quartz", every = "1s", executeWith = Scheduled.QUARTZ)
void quartz() {
quartzLatch.countDown();
}

@Scheduled(identity = "auto", every = "1s")
void auto() {
autoLatch.countDown();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
} else if (!forceStart && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) {
} else if (!forceStart && context.getScheduledMethods(Scheduled.QUARTZ).isEmpty()
&& !context.forceSchedulerStart()) {
LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started");
this.scheduler = null;
} else {
Expand Down Expand Up @@ -232,10 +233,13 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) {
}
};

for (ScheduledMethod method : context.getScheduledMethods()) {
for (ScheduledMethod method : context.getScheduledMethods(Scheduled.QUARTZ)) {
int nameSequence = 0;

for (Scheduled scheduled : method.getSchedules()) {
if (!context.matchesImplementation(scheduled, Scheduled.QUARTZ)) {
continue;
}
String identity = SchedulerUtils.lookUpPropertyValue(scheduled.identity());
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
Expand Down Expand Up @@ -345,6 +349,11 @@ public org.quartz.Scheduler getScheduler() {
return scheduler;
}

@Override
public String implementation() {
return Scheduled.QUARTZ;
}

@Override
public void pause() {
if (!enabled) {
Expand Down Expand Up @@ -893,7 +902,7 @@ public Trigger schedule() {
}
scheduled = true;
SyntheticScheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation);
return createJobDefinitionQuartzTrigger(this, scheduled, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,28 @@
*/
String DEFAULT_TIMEZONE = "<<default timezone>>";

/**
* Constant value for {@link #executeWith()} indicating that the implementation should be selected automatically, i.e. the
* implementation with highest priority is used.
*/
String AUTO = "<<auto>>";

/**
* Constant value for {@link #executeWith()} indicating that the simple in-memory implementation provided by the
* {@code quarkus-scheduler} extension should be used.
* <p>
* This implementation has priority {@code 0}.
*/
String SIMPLE = "SIMPLE";

/**
* Constant value for {@link #executeWith()} indicating that the Quartz implementation provided by the
* {@code quarkus-quartz} extension should be used.
* <p>
* This implementation has priority {@code 1}.
*/
String QUARTZ = "QUARTZ";

/**
* Optionally defines a unique identifier for this job.
* <p>
Expand Down Expand Up @@ -205,6 +227,20 @@
*/
String timeZone() default DEFAULT_TIMEZONE;

/**
* The scheduler implementation used to execute this scheduled method.
* <p>
* By default, the implementation with highest priority is selected automatically.
* <p>
* If the required implementation is not available, then the build fails.
*
* @return the implementation to execute this scheduled method
* @see #AUTO
* @see #SIMPLE
* @see #QUARTZ
*/
String executeWith() default AUTO;

@Retention(RUNTIME)
@Target(METHOD)
@interface Schedules {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ public interface Scheduler {

/**
* Identity must not be null and {@code false} is returned for non-existent identity.
* <p>
* Note that this method only returns {@code true} if the job was explicitly paused. I.e. it does not reflect a paused
* scheduler.
*
* @param identity
* @return {@code true} if the job with the given identity is paused, {@code false} otherwise
* @see Scheduled#identity()
* @see #pause(String)
*/
boolean isPaused(String identity);

Expand Down Expand Up @@ -88,6 +92,13 @@ public interface Scheduler {
*/
Trigger unscheduleJob(String identity);

/**
*
* @return the implementation
* @see Scheduled#executeWith()
*/
String implementation();

/**
* The job definition is a builder-like API that can be used to define a job programmatically.
* <p>
Expand Down Expand Up @@ -177,6 +188,16 @@ interface JobDefinition {
*/
JobDefinition setTimeZone(String timeZone);

/**
* {@link Scheduled#executeWith()}
*
* @param implementation
* @return self
* @throws IllegalArgumentException If the selected implementation is not available
* @see Scheduled#executeWith()
*/
JobDefinition setExecuteWith(String implementation);

/**
*
* @param task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public abstract class AbstractJobDefinition implements JobDefinition {
protected boolean scheduled = false;
protected String timeZone = Scheduled.DEFAULT_TIMEZONE;
protected boolean runOnVirtualThread;
protected String implementation = Scheduled.AUTO;

public AbstractJobDefinition(String identity) {
this.identity = identity;
Expand Down Expand Up @@ -88,6 +89,12 @@ public JobDefinition setTimeZone(String timeZone) {
return this;
}

@Override
public JobDefinition setExecuteWith(String implementation) {
this.implementation = implementation;
return this;
}

@Override
public JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread) {
checkScheduled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.cronutils.model.CronType;

import io.quarkus.scheduler.Scheduled;

public interface SchedulerContext {

CronType getCronType();
Expand All @@ -13,6 +15,10 @@ public interface SchedulerContext {

boolean forceSchedulerStart();

List<ScheduledMethod> getScheduledMethods(String implementation);

boolean matchesImplementation(Scheduled scheduled, String implementation);

@SuppressWarnings("unchecked")
default ScheduledInvoker createInvoker(String invokerClassName) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ public final class SyntheticScheduled extends AnnotationLiteral<Scheduled> imple
private final ConcurrentExecution concurrentExecution;
private final SkipPredicate skipPredicate;
private final String timeZone;
private final String implementation;

public SyntheticScheduled(String identity, String cron, String every, long delay, TimeUnit delayUnit, String delayed,
String overdueGracePeriod, ConcurrentExecution concurrentExecution,
SkipPredicate skipPredicate, String timeZone) {
SkipPredicate skipPredicate, String timeZone, String implementation) {
this.identity = Objects.requireNonNull(identity);
this.cron = Objects.requireNonNull(cron);
this.every = Objects.requireNonNull(every);
Expand All @@ -36,6 +37,7 @@ public SyntheticScheduled(String identity, String cron, String every, long delay
this.concurrentExecution = Objects.requireNonNull(concurrentExecution);
this.skipPredicate = skipPredicate;
this.timeZone = timeZone;
this.implementation = implementation;
}

@Override
Expand Down Expand Up @@ -88,6 +90,11 @@ public String timeZone() {
return timeZone;
}

@Override
public String executeWith() {
return implementation;
}

public String toJson() {
if (skipPredicate != null) {
throw new IllegalStateException("A skipPredicate instance may not be serialized");
Expand All @@ -102,6 +109,7 @@ public String toJson() {
json.put("overdueGracePeriod", overdueGracePeriod);
json.put("concurrentExecution", concurrentExecution.toString());
json.put("timeZone", timeZone);
json.put("executeWith", implementation);
return json.encode();
}

Expand All @@ -110,7 +118,7 @@ public static SyntheticScheduled fromJson(String json) {
return new SyntheticScheduled(jsonObj.getString("identity"), jsonObj.getString("cron"), jsonObj.getString("every"),
jsonObj.getLong("delay"), TimeUnit.valueOf(jsonObj.getString("delayUnit")), jsonObj.getString("delayed"),
jsonObj.getString("overdueGracePeriod"), ConcurrentExecution.valueOf(jsonObj.getString("concurrentExecution")),
null, jsonObj.getString("timeZone"));
null, jsonObj.getString("timeZone"), jsonObj.getString("executeWith"));
}

@Override
Expand Down
Loading

0 comments on commit 9a053dc

Please sign in to comment.