Skip to content

Commit

Permalink
feat(core,jdbc): small trigger / scheduler improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
loicmathieu committed Dec 13, 2024
1 parent dc7fef2 commit 3056e9c
Show file tree
Hide file tree
Showing 11 changed files with 40 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,6 @@ private void handle() {
)
.build()
)
.peek(f -> {
if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) {
this.triggerState.unlock(f.getTriggerContext());
}
})
.filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null)
.filter(this::isExecutionNotRunning)
.map(FlowWithWorkerTriggerNextDate::of)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
package io.kestra.core.schedulers;

import java.util.function.Consumer;

// For tests purpose
public class DefaultScheduleContext implements ScheduleContextInterface {}
public class DefaultScheduleContext implements ScheduleContextInterface {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
consumer.accept(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
package io.kestra.core.schedulers;

import java.util.function.Consumer;

/**
* This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop.
* See AbstractScheduler.handle().
*/
public interface ScheduleContextInterface {
/**
* Do trigger retrieval and updating in a single transaction.
*/
void doInTransaction(Consumer<ScheduleContextInterface> consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ public interface SchedulerTriggerStateInterface {

Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception;

List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);

/**
* Required for Kafka
* Used by the JDBC implementation: find triggers in all tenants.
*/
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext);

/**
* Required for Kafka
* Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment).
*/
void unlock(Trigger trigger);
List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

private static Flow createFlow(Boolean failed) {
RealtimeUnitTest schedule = RealtimeUnitTest.builder()
.id("stream")
Expand Down Expand Up @@ -75,6 +78,7 @@ private void run(Flow flow, CountDownLatch queueCount, Consumer<List<Execution>>
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersServiceSpy,
executionState,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest {
@Inject
protected SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

public static Flow createFlow(Duration sleep) {
SleepTriggerTest schedule = SleepTriggerTest.builder()
.id("sleep")
Expand Down Expand Up @@ -101,6 +104,7 @@ void run() throws Exception {
AbstractScheduler scheduler = new DefaultScheduler(
applicationContext,
flowListenersService,
executionState,
triggerState
);
Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.runners.Worker;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerExecutionStateInterface;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.services.FlowListenersInterface;
import io.kestra.core.utils.IdUtils;
Expand All @@ -31,6 +32,9 @@ class TriggerTest {
@Inject
private SchedulerTriggerStateInterface triggerState;

@Inject
protected SchedulerExecutionStateInterface executionState;

@Inject
private FlowListenersInterface flowListenersService;

Expand All @@ -51,6 +55,7 @@ void trigger() throws Exception {
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.executionState,
this.triggerState
);
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
Expand Down Expand Up @@ -89,6 +94,7 @@ void trigger_EncryptedBody() throws Exception {
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.executionState,
this.triggerState
);
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void run() {
public void handleNext(List<Flow> flows, ZonedDateTime now, BiConsumer<List<Trigger>, ScheduleContextInterface> consumer) {
JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper);

schedulerContext.startTransaction(scheduleContextInterface -> {
schedulerContext.doInTransaction(scheduleContextInterface -> {
List<Trigger> triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface);

consumer.accept(triggers, scheduleContextInterface);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@ public JdbcSchedulerContext(JooqDSLContextWrapper dslContextWrapper) {
this.dslContextWrapper = dslContextWrapper;
}

public void startTransaction(Consumer<ScheduleContextInterface> consumer) {
@Override
public void doInTransaction(Consumer<ScheduleContextInterface> consumer) {
this.dslContextWrapper.transaction(configuration -> {
this.context = DSL.using(configuration);

consumer.accept(this);

this.commit();
this.context.commit();
});
}

public void commit() {
this.context.commit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,4 @@ public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}

@Override
public void unlock(Trigger trigger) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,4 @@ public List<Trigger> findByNextExecutionDateReadyForAllTenants(ZonedDateTime now
public List<Trigger> findByNextExecutionDateReadyForGivenFlows(List<Flow> flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) {
throw new NotImplementedException();
}

@Override
public void unlock(Trigger trigger) {}
}

0 comments on commit 3056e9c

Please sign in to comment.