Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler - support non-blocking scheduled methods #24658

Merged
merged 1 commit into from
Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/src/main/asciidoc/scheduler-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,24 @@ The main idea is to keep the the logic to skip the execution outside the schedul

TIP: A CDI event of type `io.quarkus.scheduler.SkippedExecution` is fired when an execution of a scheduled method is skipped.

=== Non-blocking Methods

By default, a scheduled method is executed on the main executor for blocking tasks.
As a result, a technology that is designed to run on a Vert.x event loop (such as Hibernate Reactive) cannot be used inside the method body.
For this reason, a scheduled method that returns `java.util.concurrent.CompletionStage<Void>` or `io.smallrye.mutiny.Uni<Void>`, or is annotated with `@io.smallrye.common.annotation.NonBlocking` is executed on the Vert.x event loop instead.

[source,java]
----
class Jobs {

@Scheduled(every = "1s")
Uni<Void> everySecond() { <1>
// ...do something async
}
}
----
<1> The return type `Uni<Void>` instructs the scheduler to execute the method on the Vert.x event loop.

== Scheduler

Quarkus provides a built-in bean of type `io.quarkus.scheduler.Scheduler` that can be injected and used to pause/resume the scheduler and individual scheduled methods identified by a specific `Scheduled#identity()`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package io.quarkus.quartz.test.nonblocking;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.enterprise.event.Observes;
import javax.inject.Singleton;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.scheduler.Scheduled;
import io.quarkus.scheduler.ScheduledExecution;
import io.quarkus.scheduler.SuccessfulExecution;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;

public class NonBlockingScheduledMethodTest {

@RegisterExtension
static final QuarkusUnitTest test = new QuarkusUnitTest()
.withApplicationRoot(root -> root.addClasses(Jobs.class, JobWasExecuted.class));

@Test
public void testVoid() throws InterruptedException {
assertTrue(Jobs.VOID_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.VOID_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_void");
}

@Test
public void testUni() throws InterruptedException {
assertTrue(Jobs.UNI_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.UNI_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_uni");
}

@Test
public void testCompletionStage() throws InterruptedException {
assertTrue(Jobs.CS_LATCH.await(5, TimeUnit.SECONDS));
assertTrue(Jobs.CS_ON_EVENT_LOOP.get());
assertTrue(Jobs.SUCCESS_LATCH.await(5, TimeUnit.SECONDS));
assertEvents("every_cs");
}

private void assertEvents(String id) {
for (SuccessfulExecution exec : Jobs.events) {
if (exec.getExecution().getTrigger().getId().equals(id)) {
return;
}
}
fail("No SuccessfulExecution event fired for " + id + ": " + Jobs.events);
}

static class Jobs {

// jobs executed
static final CountDownLatch VOID_LATCH = new CountDownLatch(1);
static final CountDownLatch UNI_LATCH = new CountDownLatch(1);
static final CountDownLatch CS_LATCH = new CountDownLatch(1);

// jobs executed on the event loop
static final AtomicBoolean VOID_ON_EVENT_LOOP = new AtomicBoolean();
static final AtomicBoolean UNI_ON_EVENT_LOOP = new AtomicBoolean();
static final AtomicBoolean CS_ON_EVENT_LOOP = new AtomicBoolean();

// sucessfull events
static final CountDownLatch SUCCESS_LATCH = new CountDownLatch(3);
static final List<SuccessfulExecution> events = new CopyOnWriteArrayList<>();

static void onSuccess(@Observes SuccessfulExecution event) {
events.add(event);
SUCCESS_LATCH.countDown();
}

@NonBlocking
@Scheduled(every = "0.5s", identity = "every_void", skipExecutionIf = JobWasExecuted.class)
void everySecond() {
VOID_ON_EVENT_LOOP.set(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
VOID_LATCH.countDown();
}

@Scheduled(every = "0.5s", identity = "every_uni", skipExecutionIf = JobWasExecuted.class)
Uni<Void> everySecondUni() {
UNI_ON_EVENT_LOOP.set(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
UNI_LATCH.countDown();
return Uni.createFrom().voidItem();
}

@Scheduled(every = "0.5s", identity = "every_cs", skipExecutionIf = JobWasExecuted.class)
CompletionStage<Void> everySecondCompletionStage() {
CompletableFuture<Void> ret = new CompletableFuture<Void>();
CS_ON_EVENT_LOOP.set(Context.isOnEventLoopThread() && VertxContext.isOnDuplicatedContext());
CS_LATCH.countDown();
ret.complete(null);
return ret;
}
}

@Singleton
static class JobWasExecuted implements Scheduled.SkipPredicate {

@Override
public boolean test(ScheduledExecution execution) {
switch (execution.getTrigger().getId()) {
case "every_void":
return Jobs.VOID_LATCH.getCount() == 0;
case "every_uni":
return Jobs.UNI_LATCH.getCount() == 0;
case "every_cs":
return Jobs.CS_LATCH.getCount() == 0;
default:
return false;
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@
import io.quarkus.scheduler.runtime.SkipPredicateInvoker;
import io.quarkus.scheduler.runtime.StatusEmitterInvoker;
import io.quarkus.scheduler.runtime.util.SchedulerUtils;
import io.smallrye.common.vertx.VertxContext;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

@Singleton
public class QuartzScheduler implements Scheduler {
Expand All @@ -87,7 +90,8 @@ public class QuartzScheduler implements Scheduler {

public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, SchedulerRuntimeConfig schedulerRuntimeConfig,
Event<SkippedExecution> skippedExecutionEvent, Event<SuccessfulExecution> successfulExecutionEvent,
Event<FailedExecution> failedExecutionEvent, Instance<Job> jobs, Instance<UserTransaction> userTransaction) {
Event<FailedExecution> failedExecutionEvent, Instance<Job> jobs, Instance<UserTransaction> userTransaction,
Vertx vertx) {
enabled = schedulerRuntimeConfig.enabled;
final Duration defaultOverdueGracePeriod = schedulerRuntimeConfig.overdueGracePeriod;
final QuartzRuntimeConfig runtimeConfig = quartzSupport.getRuntimeConfig();
Expand Down Expand Up @@ -122,7 +126,7 @@ public QuartzScheduler(SchedulerContext context, QuartzSupport quartzSupport, Sc
scheduler = schedulerFactory.getScheduler();

// Set custom job factory
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs));
scheduler.setJobFactory(new InvokerJobFactory(scheduledTasks, jobs, vertx));
CronType cronType = context.getCronType();
CronDefinition def = CronDefinitionBuilder.instanceDefinitionFor(cronType);
CronParser parser = new CronParser(def);
Expand Down Expand Up @@ -543,18 +547,33 @@ private Properties getAdditionalConfigurationProperties(String prefix, Map<Strin
static class InvokerJob implements Job {

final QuartzTrigger trigger;
final Vertx vertx;

InvokerJob(QuartzTrigger trigger) {
InvokerJob(QuartzTrigger trigger, Vertx vertx) {
this.trigger = trigger;
this.vertx = vertx;
}

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
if (trigger.invoker != null) { // could be null from previous runs
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, context));
} catch (Exception e) {
throw new JobExecutionException(e);
if (trigger.invoker.isBlocking()) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, context));
} catch (Exception e) {
throw new JobExecutionException(e);
}
} else {
VertxContext.getOrCreateDuplicatedContext(vertx).runOnContext(new Handler<Void>() {
@Override
public void handle(Void event) {
try {
trigger.invoker.invoke(new QuartzScheduledExecution(trigger, context));
} catch (Exception e) {
// already logged by the StatusEmitterInvoker
}
}
});
}
}
}
Expand Down Expand Up @@ -632,10 +651,12 @@ static class InvokerJobFactory extends SimpleJobFactory {

final Map<String, QuartzTrigger> scheduledTasks;
final Instance<Job> jobs;
final Vertx vertx;

InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs) {
InvokerJobFactory(Map<String, QuartzTrigger> scheduledTasks, Instance<Job> jobs, Vertx vertx) {
this.scheduledTasks = scheduledTasks;
this.jobs = jobs;
this.vertx = vertx;
}

@SuppressWarnings("unchecked")
Expand All @@ -644,7 +665,7 @@ public Job newJob(TriggerFiredBundle bundle, org.quartz.Scheduler Scheduler) thr
Class<? extends Job> jobClass = bundle.getJobDetail().getJobClass();

if (jobClass.equals(InvokerJob.class)) {
return new InvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()));
return new InvokerJob(scheduledTasks.get(bundle.getJobDetail().getKey().getName()), vertx);
}
if (Subclass.class.isAssignableFrom(jobClass)) {
// Get the original class from an intercepted bean class
Expand Down
4 changes: 2 additions & 2 deletions extensions/scheduler/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@
<dependencies>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core-deployment</artifactId>
<artifactId>quarkus-arc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-arc-deployment</artifactId>
<artifactId>quarkus-vertx-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,21 @@
public final class ScheduledBusinessMethodItem extends MultiBuildItem {

private final BeanInfo bean;

private final List<AnnotationInstance> schedules;

private final MethodInfo method;
private final boolean nonBlocking;

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules) {
this(bean, method, schedules, false);
}

public ScheduledBusinessMethodItem(BeanInfo bean, MethodInfo method, List<AnnotationInstance> schedules,
boolean hasNonBlockingAnnotation) {
this.bean = bean;
this.method = method;
this.schedules = schedules;
this.nonBlocking = hasNonBlockingAnnotation || SchedulerDotNames.COMPLETION_STAGE.equals(method.returnType().name())
|| SchedulerDotNames.UNI.equals(method.returnType().name());
}

/**
Expand All @@ -38,4 +44,8 @@ public List<AnnotationInstance> getSchedules() {
return schedules;
}

public boolean isNonBlocking() {
return nonBlocking;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.scheduler.deployment;

import java.util.concurrent.CompletionStage;

import org.jboss.jandex.DotName;

import io.quarkus.scheduler.Scheduled;
import io.smallrye.common.annotation.NonBlocking;

class SchedulerDotNames {

static final DotName SCHEDULED_NAME = DotName.createSimple(Scheduled.class.getName());
static final DotName SCHEDULES_NAME = DotName.createSimple(Scheduled.Schedules.class.getName());
static final DotName SKIP_NEVER_NAME = DotName.createSimple(Scheduled.Never.class.getName());
static final DotName SKIP_PREDICATE = DotName.createSimple(Scheduled.SkipPredicate.class.getName());
static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName());
static final DotName UNI = DotName.createSimple("io.smallrye.mutiny.Uni");
static final DotName COMPLETION_STAGE = DotName.createSimple(CompletionStage.class.getName());
static final DotName VOID = DotName.createSimple(Void.class.getName());

}
Loading