From 3056e9c40208b3bde9dfd1fd2dd3cc432f6e9232 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Fri, 13 Dec 2024 10:50:52 +0100 Subject: [PATCH] feat(core,jdbc): small trigger / scheduler improvements --- .../io/kestra/core/schedulers/AbstractScheduler.java | 5 ----- .../kestra/core/schedulers/DefaultScheduleContext.java | 9 ++++++++- .../core/schedulers/ScheduleContextInterface.java | 10 ++++++++++ .../schedulers/SchedulerTriggerStateInterface.java | 9 ++++----- .../kestra/core/schedulers/SchedulerStreamingTest.java | 4 ++++ .../core/schedulers/SchedulerTriggerChangeTest.java | 4 ++++ .../java/io/kestra/plugin/core/http/TriggerTest.java | 6 ++++++ .../main/java/io/kestra/jdbc/runner/JdbcScheduler.java | 2 +- .../io/kestra/jdbc/runner/JdbcSchedulerContext.java | 9 +++------ .../kestra/jdbc/runner/JdbcSchedulerTriggerState.java | 3 --- .../runner/memory/MemorySchedulerTriggerState.java | 3 --- 11 files changed, 40 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java index 473ddb77070..385032ec627 100644 --- a/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java +++ b/core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java @@ -432,11 +432,6 @@ private void handle() { ) .build() ) - .peek(f -> { - if (f.getTriggerContext().getEvaluateRunningDate() != null || !isExecutionNotRunning(f)) { - this.triggerState.unlock(f.getTriggerContext()); - } - }) .filter(f -> f.getTriggerContext().getEvaluateRunningDate() == null) .filter(this::isExecutionNotRunning) .map(FlowWithWorkerTriggerNextDate::of) diff --git a/core/src/main/java/io/kestra/core/schedulers/DefaultScheduleContext.java b/core/src/main/java/io/kestra/core/schedulers/DefaultScheduleContext.java index 7e0dd7838f3..6ebe1658e7c 100644 --- a/core/src/main/java/io/kestra/core/schedulers/DefaultScheduleContext.java +++ b/core/src/main/java/io/kestra/core/schedulers/DefaultScheduleContext.java @@ -1,4 +1,11 @@ package io.kestra.core.schedulers; +import java.util.function.Consumer; + // For tests purpose -public class DefaultScheduleContext implements ScheduleContextInterface {} +public class DefaultScheduleContext implements ScheduleContextInterface { + @Override + public void doInTransaction(Consumer consumer) { + consumer.accept(this); + } +} diff --git a/core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java b/core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java index d58e0f864c7..83d2ae8e8ce 100644 --- a/core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java +++ b/core/src/main/java/io/kestra/core/schedulers/ScheduleContextInterface.java @@ -1,4 +1,14 @@ package io.kestra.core.schedulers; +import java.util.function.Consumer; + +/** + * This context is used by the Scheduler to allow evaluating and updating triggers in a transaction from the main evaluation loop. + * See AbstractScheduler.handle(). + */ public interface ScheduleContextInterface { + /** + * Do trigger retrieval and updating in a single transaction. + */ + void doInTransaction(Consumer consumer); } diff --git a/core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java b/core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java index 8f95d3354a4..3b38f417072 100644 --- a/core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java +++ b/core/src/main/java/io/kestra/core/schedulers/SchedulerTriggerStateInterface.java @@ -24,15 +24,14 @@ public interface SchedulerTriggerStateInterface { Trigger update(Flow flow, AbstractTrigger abstractTrigger, ConditionContext conditionContext) throws Exception; - List findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext); /** - * Required for Kafka + * Used by the JDBC implementation: find triggers in all tenants. */ - List findByNextExecutionDateReadyForGivenFlows(List flows, ZonedDateTime now, ScheduleContextInterface scheduleContext); + List findByNextExecutionDateReadyForAllTenants(ZonedDateTime now, ScheduleContextInterface scheduleContext); /** - * Required for Kafka + * Used by the Kafka implementation: find triggers in the scheduler assigned flow (as in Kafka partition assignment). */ - void unlock(Trigger trigger); + List findByNextExecutionDateReadyForGivenFlows(List flows, ZonedDateTime now, ScheduleContextInterface scheduleContext); } diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java index e44e13d94ec..904acbd5d4b 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerStreamingTest.java @@ -42,6 +42,9 @@ public class SchedulerStreamingTest extends AbstractSchedulerTest { @Inject protected SchedulerTriggerStateInterface triggerState; + @Inject + protected SchedulerExecutionStateInterface executionState; + private static Flow createFlow(Boolean failed) { RealtimeUnitTest schedule = RealtimeUnitTest.builder() .id("stream") @@ -75,6 +78,7 @@ private void run(Flow flow, CountDownLatch queueCount, Consumer> AbstractScheduler scheduler = new DefaultScheduler( applicationContext, flowListenersServiceSpy, + executionState, triggerState ); Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null) diff --git a/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java b/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java index 73e35b0f19f..31fbbbf1689 100644 --- a/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java +++ b/core/src/test/java/io/kestra/core/schedulers/SchedulerTriggerChangeTest.java @@ -52,6 +52,9 @@ public class SchedulerTriggerChangeTest extends AbstractSchedulerTest { @Inject protected SchedulerTriggerStateInterface triggerState; + @Inject + protected SchedulerExecutionStateInterface executionState; + public static Flow createFlow(Duration sleep) { SleepTriggerTest schedule = SleepTriggerTest.builder() .id("sleep") @@ -101,6 +104,7 @@ void run() throws Exception { AbstractScheduler scheduler = new DefaultScheduler( applicationContext, flowListenersService, + executionState, triggerState ); Worker worker = applicationContext.createBean(TestMethodScopedWorker.class, IdUtils.create(), 8, null) diff --git a/core/src/test/java/io/kestra/plugin/core/http/TriggerTest.java b/core/src/test/java/io/kestra/plugin/core/http/TriggerTest.java index 2269a25aee9..e9d5fbc27e9 100644 --- a/core/src/test/java/io/kestra/plugin/core/http/TriggerTest.java +++ b/core/src/test/java/io/kestra/plugin/core/http/TriggerTest.java @@ -7,6 +7,7 @@ import io.kestra.core.runners.Worker; import io.kestra.core.schedulers.AbstractScheduler; import io.kestra.core.schedulers.DefaultScheduler; +import io.kestra.core.schedulers.SchedulerExecutionStateInterface; import io.kestra.core.schedulers.SchedulerTriggerStateInterface; import io.kestra.core.services.FlowListenersInterface; import io.kestra.core.utils.IdUtils; @@ -31,6 +32,9 @@ class TriggerTest { @Inject private SchedulerTriggerStateInterface triggerState; + @Inject + protected SchedulerExecutionStateInterface executionState; + @Inject private FlowListenersInterface flowListenersService; @@ -51,6 +55,7 @@ void trigger() throws Exception { AbstractScheduler scheduler = new DefaultScheduler( this.applicationContext, this.flowListenersService, + this.executionState, this.triggerState ); Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null); @@ -89,6 +94,7 @@ void trigger_EncryptedBody() throws Exception { AbstractScheduler scheduler = new DefaultScheduler( this.applicationContext, this.flowListenersService, + this.executionState, this.triggerState ); ) { diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java index 9191d6d36a0..37b56ad4a57 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcScheduler.java @@ -100,7 +100,7 @@ public void run() { public void handleNext(List flows, ZonedDateTime now, BiConsumer, ScheduleContextInterface> consumer) { JdbcSchedulerContext schedulerContext = new JdbcSchedulerContext(this.dslContextWrapper); - schedulerContext.startTransaction(scheduleContextInterface -> { + schedulerContext.doInTransaction(scheduleContextInterface -> { List triggers = this.triggerState.findByNextExecutionDateReadyForAllTenants(now, scheduleContextInterface); consumer.accept(triggers, scheduleContextInterface); diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java index 4c3935e84d0..2dcad9a360e 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerContext.java @@ -18,17 +18,14 @@ public JdbcSchedulerContext(JooqDSLContextWrapper dslContextWrapper) { this.dslContextWrapper = dslContextWrapper; } - public void startTransaction(Consumer consumer) { + @Override + public void doInTransaction(Consumer consumer) { this.dslContextWrapper.transaction(configuration -> { this.context = DSL.using(configuration); consumer.accept(this); - this.commit(); + this.context.commit(); }); } - - public void commit() { - this.context.commit(); - } } \ No newline at end of file diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java index 4f3f6c15189..8f47587c6d6 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcSchedulerTriggerState.java @@ -84,7 +84,4 @@ public List findByNextExecutionDateReadyForAllTenants(ZonedDateTime now public List findByNextExecutionDateReadyForGivenFlows(List flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) { throw new NotImplementedException(); } - - @Override - public void unlock(Trigger trigger) {} } diff --git a/runner-memory/src/main/java/io/kestra/runner/memory/MemorySchedulerTriggerState.java b/runner-memory/src/main/java/io/kestra/runner/memory/MemorySchedulerTriggerState.java index b5b4442d1e3..481c1d5a7a7 100644 --- a/runner-memory/src/main/java/io/kestra/runner/memory/MemorySchedulerTriggerState.java +++ b/runner-memory/src/main/java/io/kestra/runner/memory/MemorySchedulerTriggerState.java @@ -79,7 +79,4 @@ public List findByNextExecutionDateReadyForAllTenants(ZonedDateTime now public List findByNextExecutionDateReadyForGivenFlows(List flows, ZonedDateTime now, ScheduleContextInterface scheduleContext) { throw new NotImplementedException(); } - - @Override - public void unlock(Trigger trigger) {} }