Skip to content

Commit

Permalink
fix(core): multiple potential deadlocks and races on the AbstractSche…
Browse files Browse the repository at this point in the history
…duler
  • Loading branch information
loicmathieu authored and tchiotludo committed Jun 5, 2023
1 parent 9b2ae9d commit 227a161
Showing 1 changed file with 50 additions and 39 deletions.
89 changes: 50 additions & 39 deletions core/src/main/java/io/kestra/core/schedulers/AbstractScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,29 @@ public abstract class AbstractScheduler implements Scheduler {
private final MetricRegistry metricRegistry;
private final ConditionService conditionService;
private final TaskDefaultService taskDefaultService;

protected SchedulerExecutionStateInterface executionState;
protected SchedulerTriggerStateInterface triggerState;
protected Boolean isReady = false;

private final ScheduledExecutorService scheduleExecutor = Executors.newSingleThreadScheduledExecutor();
private final ListeningExecutorService cachedExecutor;
private final Map<String, ZonedDateTime> lastEvaluate = new ConcurrentHashMap<>();

// The evaluateRunningLock must be used when accessing evaluateRunning or evaluateRunningCount
private final Object evaluateRunningLock = new Object();
private final Map<String, ZonedDateTime> evaluateRunning = new ConcurrentHashMap<>();
private final Map<String, AtomicInteger> evaluateRunningCount = new ConcurrentHashMap<>();

// The triggerStateSavedLock must be used when accessing triggerStateSaved
private final Object triggerStateSavedLock = new Object();
private final Map<String, Trigger> triggerStateSaved = new ConcurrentHashMap<>();
protected SchedulerTriggerStateInterface triggerState;

@Getter
private List<FlowWithTrigger> schedulable = new ArrayList<>();

// schedulable and schedulableNextDate must be volatile and their access synchronized as they are updated and read by different threads.
@Getter
private volatile List<FlowWithTrigger> schedulable = new ArrayList<>();
@Getter
private Map<String, FlowWithPollingTriggerNextDate> schedulableNextDate = new HashMap<>();
private volatile Map<String, FlowWithPollingTriggerNextDate> schedulableNextDate = new HashMap<>();

@SuppressWarnings("unchecked")
@Inject
Expand Down Expand Up @@ -127,13 +133,13 @@ public void run() {

// remove trigger on flow update
this.flowListeners.listen((flow, previous) -> {
synchronized (this) {
synchronized (triggerStateSavedLock) {
if (flow.isDeleted()) {
ListUtils.emptyOnNull(flow.getTriggers())
.forEach(abstractTrigger -> {
Trigger trigger = Trigger.of(flow, abstractTrigger);
triggerStateSaved.remove(trigger.uid());

triggerStateSaved.remove(trigger.uid());
triggerQueue.delete(trigger);
});
} else if (previous != null) {
Expand All @@ -150,8 +156,9 @@ public void run() {
});
}

private void computeSchedulable(List<Flow> flows) {
schedulableNextDate = new HashMap<>();
// must be synchronized as it update schedulableNextDate and schedulable, and will be executed on the flow listener thread
private synchronized void computeSchedulable(List<Flow> flows) {
this.schedulableNextDate = new HashMap<>();

this.schedulable = flows
.stream()
Expand Down Expand Up @@ -186,17 +193,17 @@ private void handle() {

ZonedDateTime now = now();

synchronized (this) {
if (log.isDebugEnabled()) {
log.debug(
"Scheduler next iteration for {} with {} schedulables of {} flows",
now,
schedulable.size(),
this.flowListeners.flows().size()
);
}
if (log.isDebugEnabled()) {
log.debug(
"Scheduler next iteration for {} with {} schedulables of {} flows",
now,
schedulable.size(),
this.flowListeners.flows().size()
);
}

// get all that is ready from evaluation
synchronized (this) {
// get all triggers that are ready from evaluation
List<FlowWithPollingTriggerNextDate> readyForEvaluate = schedulable
.stream()
.filter(f -> conditionService.isValid(f.getFlow(), f.getTrigger(), f.getConditionContext()))
Expand All @@ -219,17 +226,15 @@ private void handle() {
.filter(f -> this.isEvaluationInterval(f, now))
.filter(f -> this.isExecutionNotRunning(f, now))
.map(f -> {
synchronized (this) {
Trigger lastTrigger = this.getLastTrigger(f, now);
Trigger lastTrigger = this.getLastTrigger(f, now);

return FlowWithPollingTriggerNextDate.of(
f,
f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.of(lastTrigger))
);
}
return FlowWithPollingTriggerNextDate.of(
f,
f.getPollingTrigger().nextEvaluationDate(f.getConditionContext(), Optional.of(lastTrigger))
);
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
.toList();

if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -320,7 +325,8 @@ private void handleEvaluatePollingTriggerResult(SchedulerExecutionWithTrigger re
}

private void addToRunning(TriggerContext triggerContext, ZonedDateTime now) {
synchronized (this) {
synchronized (evaluateRunningLock) {
// TODO see if we can move this to the time we first met a new trigger to avoid calling it at each trigger execution
this.evaluateRunningCount.computeIfAbsent(triggerContext.uid(), s -> metricRegistry
.gauge(MetricRegistry.SCHEDULER_EVALUATE_RUNNING_COUNT, new AtomicInteger(0), metricRegistry.tags(triggerContext)));

Expand All @@ -331,7 +337,7 @@ private void addToRunning(TriggerContext triggerContext, ZonedDateTime now) {


private void removeFromRunning(TriggerContext triggerContext) {
synchronized (this) {
synchronized (evaluateRunningLock) {
if (this.evaluateRunning.remove(triggerContext.uid()) == null) {
throw new IllegalStateException("Can't remove trigger '" + triggerContext.uid() + "' from running");
}
Expand Down Expand Up @@ -437,18 +443,21 @@ private Trigger getLastTrigger(FlowWithPollingTrigger f, ZonedDateTime now) {
.updatedDate(Instant.now())
.build();

// TODO ask ludo for the comment as it didn't seems to be accurate
// we don't find, so never started execution, create a trigger context with previous date in the past.
// this allows some edge case when the evaluation loop of schedulers will change second
// between start and end
if (triggerStateSaved.containsKey(build.uid())) {
Trigger cachedTrigger = triggerStateSaved.get(build.uid());
synchronized (triggerStateSavedLock) {
if (triggerStateSaved.containsKey(build.uid())) {
Trigger cachedTrigger = triggerStateSaved.get(build.uid());

triggerState.save(build);
triggerStateSaved.remove(build.uid());
triggerState.save(build);
triggerStateSaved.remove(build.uid());

return cachedTrigger;
} else {
triggerStateSaved.put(build.uid(), build);
return cachedTrigger;
} else {
triggerStateSaved.put(build.uid(), build);
}
}

return build;
Expand Down Expand Up @@ -482,14 +491,16 @@ private boolean isEvaluationInterval(FlowWithPollingTrigger flowWithPollingTrigg
return result;
}

protected synchronized void saveLastTriggerAndEmitExecution(SchedulerExecutionWithTrigger executionWithTrigger) {
protected void saveLastTriggerAndEmitExecution(SchedulerExecutionWithTrigger executionWithTrigger) {
Trigger trigger = Trigger.of(
executionWithTrigger.getTriggerContext(),
executionWithTrigger.getExecution()
);

this.triggerState.save(trigger);
this.executionQueue.emit(executionWithTrigger.getExecution());
synchronized (triggerStateSavedLock) {
this.triggerState.save(trigger);
this.executionQueue.emit(executionWithTrigger.getExecution());
}
}

private static ZonedDateTime now() {
Expand Down

0 comments on commit 227a161

Please sign in to comment.