Skip to content

Commit

Permalink
Scheduler: run multiple scheduler implementations
Browse files Browse the repository at this point in the history
- related to quarkusio#41954
  • Loading branch information
mkouba committed Jul 30, 2024
1 parent a02d772 commit 8ba2d86
Show file tree
Hide file tree
Showing 20 changed files with 716 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,9 @@
import io.quarkus.quartz.runtime.jdbc.QuarkusPostgreSQLDelegate;
import io.quarkus.quartz.runtime.jdbc.QuarkusStdJDBCDelegate;
import io.quarkus.runtime.configuration.ConfigurationException;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.deployment.SchedulerImplementationBuildItem;

/**
*
*/
public class QuartzProcessor {

private static final DotName JOB = DotName.createSimple(Job.class.getName());
Expand All @@ -77,6 +76,11 @@ FeatureBuildItem feature() {
return new FeatureBuildItem(Feature.QUARTZ);
}

@BuildStep
SchedulerImplementationBuildItem implementation() {
return new SchedulerImplementationBuildItem(Implementation.QUARTZ, DotName.createSimple(QuartzSchedulerImpl.class), 1);
}

@BuildStep
AdditionalBeanBuildItem beans() {
return new AdditionalBeanBuildItem(QuartzSchedulerImpl.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.quartz.test.composite;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.test.QuarkusUnitTest;

public class CompositeSchedulerNotUsedTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.assertException(t -> {
assertThat(t).cause().isInstanceOf(IllegalStateException.class)
.hasMessageContaining(
"The required scheduler implementation is not available because the composite scheduler is not used: SIMPLE");
});

@Test
public void test() {
fail();
}

static class Jobs {

@Scheduled(every = "1s", executeWith = Implementation.SIMPLE)
void quartz() {
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package io.quarkus.quartz.test.composite;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import jakarta.inject.Inject;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.quartz.QuartzScheduler;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduler;
import io.quarkus.scheduler.runtime.Constituent;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Identifier;

public class CompositeSchedulerTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root
.addClasses(Jobs.class))
.overrideConfigKey("quarkus.scheduler.use-composite-scheduler", "true");

@Constituent
QuartzScheduler quartz;

@Constituent
@Identifier("SIMPLE")
Scheduler simple;

@Inject
Scheduler composite;

@Test
public void testExecution() throws InterruptedException {
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));

assertNull(quartz.getScheduledJob("simple"));
assertNotNull(quartz.getScheduledJob("quartz"));
assertNotNull(quartz.getScheduledJob("auto"));

assertNotNull(simple.getScheduledJob("simple"));
assertNull(simple.getScheduledJob("quartz"));
assertNull(simple.getScheduledJob("auto"));

assertNotNull(composite.getScheduledJob("quartz"));
assertNotNull(composite.getScheduledJob("auto"));
assertNotNull(composite.getScheduledJob("simple"));

composite.pause();
Jobs.reset();
assertFalse(composite.isRunning());
assertFalse(Jobs.simpleLatch.await(2, TimeUnit.SECONDS));

composite.resume();
assertTrue(composite.isRunning());
assertTrue(Jobs.simpleLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.quartzLatch.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.autoLatch.await(5, TimeUnit.SECONDS));
}

static class Jobs {

static CountDownLatch simpleLatch = new CountDownLatch(1);
static CountDownLatch quartzLatch = new CountDownLatch(1);
static CountDownLatch autoLatch = new CountDownLatch(1);

static void reset() {
simpleLatch = new CountDownLatch(1);
quartzLatch = new CountDownLatch(1);
autoLatch = new CountDownLatch(1);
}

@Scheduled(identity = "simple", every = "1s", executeWith = Implementation.SIMPLE)
void simple() {
simpleLatch.countDown();
}

@Scheduled(identity = "quartz", every = "1s", executeWith = Implementation.QUARTZ)
void quartz() {
quartzLatch.countDown();
}

@Scheduled(identity = "auto", every = "1s")
void auto() {
autoLatch.countDown();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import io.quarkus.runtime.StartupEvent;
import io.quarkus.scheduler.FailedExecution;
import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduled.SkipPredicate;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.ScheduledJobPaused;
Expand Down Expand Up @@ -198,7 +199,8 @@ public QuartzSchedulerImpl(SchedulerContext context, QuartzSupport quartzSupport
if (!enabled) {
LOGGER.info("Quartz scheduler is disabled by config property and will not be started");
this.scheduler = null;
} else if (!forceStart && context.getScheduledMethods().isEmpty() && !context.forceSchedulerStart()) {
} else if (!forceStart && context.getScheduledMethods(Implementation.QUARTZ).isEmpty()
&& !context.forceSchedulerStart()) {
LOGGER.info("No scheduled business methods found - Quartz scheduler will not be started");
this.scheduler = null;
} else {
Expand Down Expand Up @@ -232,10 +234,13 @@ public org.quartz.Trigger apply(TriggerKey triggerKey) {
}
};

for (ScheduledMethod method : context.getScheduledMethods()) {
for (ScheduledMethod method : context.getScheduledMethods(Implementation.QUARTZ)) {
int nameSequence = 0;

for (Scheduled scheduled : method.getSchedules()) {
if (!context.matchesImplementation(scheduled, Implementation.QUARTZ)) {
continue;
}
String identity = SchedulerUtils.lookUpPropertyValue(scheduled.identity());
if (identity.isEmpty()) {
identity = ++nameSequence + "_" + method.getInvokerClassName();
Expand Down Expand Up @@ -345,6 +350,11 @@ public org.quartz.Scheduler getScheduler() {
return scheduler;
}

@Override
public Implementation implementation() {
return Implementation.QUARTZ;
}

@Override
public void pause() {
if (!enabled) {
Expand Down Expand Up @@ -893,7 +903,7 @@ public Trigger schedule() {
}
scheduled = true;
SyntheticScheduled scheduled = new SyntheticScheduled(identity, cron, every, 0, TimeUnit.MINUTES, delayed,
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone);
overdueGracePeriod, concurrentExecution, skipPredicate, timeZone, implementation);
return createJobDefinitionQuartzTrigger(this, scheduled, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,17 @@
*/
String timeZone() default DEFAULT_TIMEZONE;

/**
* The scheduler implementation used to execute this scheduled method.
* <p>
* By default, the implementation with highest priority is selected automatically.
* <p>
* If the required implementation is not available, then the build fails.
*
* @return the implementation
*/
Implementation executeWith() default Implementation.AUTO;

@Retention(RUNTIME)
@Target(METHOD)
@interface Schedules {
Expand All @@ -213,6 +224,26 @@

}

/**
* Represents a scheduler implementation used to execute a scheduled method.
*/
enum Implementation {

/**
* The implementation is selected automatically.
*/
AUTO,
/**
* Simple in-memory implementation provided by the {@code quarkus-scheduler} extension. This implementation has priority
* {@code 0}.
*/
SIMPLE,
/**
* Quartz implementation provided by the {@code quarkus-quartz} extension. This implementation has priority {@code 1}.
*/
QUARTZ
}

/**
* Represents a strategy to handle concurrent execution of a scheduled method.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import jakarta.enterprise.context.Dependent;

import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduled.SkipPredicate;
import io.smallrye.mutiny.Uni;

Expand Down Expand Up @@ -44,10 +45,14 @@ public interface Scheduler {

/**
* Identity must not be null and {@code false} is returned for non-existent identity.
* <p>
* Note that this method only returns {@code true} if the job was explicitly paused. I.e. it does not reflect a paused
* scheduler.
*
* @param identity
* @return {@code true} if the job with the given identity is paused, {@code false} otherwise
* @see Scheduled#identity()
* @see #pause(String)
*/
boolean isPaused(String identity);

Expand Down Expand Up @@ -88,6 +93,12 @@ public interface Scheduler {
*/
Trigger unscheduleJob(String identity);

/**
*
* @return the implementation
*/
Scheduled.Implementation implementation();

/**
* The job definition is a builder-like API that can be used to define a job programmatically.
* <p>
Expand Down Expand Up @@ -177,6 +188,16 @@ interface JobDefinition {
*/
JobDefinition setTimeZone(String timeZone);

/**
* {@link Scheduled#executeWith()}
*
* @param implementation
* @return self
* @throws IllegalArgumentException If the selected implementation is not available
* @see Scheduled#executeWith()
*/
JobDefinition setExecuteWith(Implementation implementation);

/**
*
* @param task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.ConcurrentExecution;
import io.quarkus.scheduler.Scheduled.Implementation;
import io.quarkus.scheduler.Scheduled.SkipPredicate;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.Scheduler.JobDefinition;
Expand All @@ -28,6 +29,7 @@ public abstract class AbstractJobDefinition implements JobDefinition {
protected boolean scheduled = false;
protected String timeZone = Scheduled.DEFAULT_TIMEZONE;
protected boolean runOnVirtualThread;
protected Scheduled.Implementation implementation = Implementation.AUTO;

public AbstractJobDefinition(String identity) {
this.identity = identity;
Expand Down Expand Up @@ -88,6 +90,12 @@ public JobDefinition setTimeZone(String timeZone) {
return this;
}

@Override
public JobDefinition setExecuteWith(Implementation implementation) {
this.implementation = implementation;
return this;
}

@Override
public JobDefinition setTask(Consumer<ScheduledExecution> task, boolean runOnVirtualThread) {
checkScheduled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

import com.cronutils.model.CronType;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.Scheduled.Implementation;

public interface SchedulerContext {

CronType getCronType();
Expand All @@ -13,6 +16,10 @@ public interface SchedulerContext {

boolean forceSchedulerStart();

List<ScheduledMethod> getScheduledMethods(Implementation implementation);

boolean matchesImplementation(Scheduled scheduled, Implementation implementation);

@SuppressWarnings("unchecked")
default ScheduledInvoker createInvoker(String invokerClassName) {
try {
Expand Down
Loading

0 comments on commit 8ba2d86

Please sign in to comment.