Skip to content

Commit

Permalink
[scheduler] Mitigation fix for earlier triggers openhab#1976 (openhab…
Browse files Browse the repository at this point in the history
…#2190)

Some users reported the scheduler to be run to earlier.
It actual is known the Java scheduler can drift.
To handle this problem 2 changes have been made:
1. If the scheduler triggers > 2 sec before expected end time it will
reschedule.
2. The scheduler keeps track of the timestamp the job should run. For
recurring schedulers, like cron, when the next job is run it offsets of
the calculated time, and not the actual time. This guaranties that the
next scheduled time is actual the next scheduled time and not the same
end time in case the scheduler would trigger to early.

Signed-off-by: Hilbrand Bouwkamp <[email protected]>
GitOrigin-RevId: 7fef42c
  • Loading branch information
Hilbrand authored and splatch committed Jul 11, 2023
1 parent e8afeae commit 3851f5a
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.openhab.core.scheduler;

import java.time.ZonedDateTime;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;

Expand All @@ -29,4 +30,9 @@ public interface ScheduledCompletableFuture<T> extends ScheduledFuture<T> {
* @return Returns the {@link CompletableFuture} associated with the scheduled job.
*/
CompletableFuture<T> getPromise();

/**
* @return Returns the timestamp the jobs is scheduled to run at.
*/
ZonedDateTime getScheduledTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ public CronSchedulerImpl(final @Reference Scheduler scheduler) {
public ScheduledCompletableFuture<@Nullable Void> schedule(CronJob job, Map<String, Object> config,
String cronExpression) {
final CronAdjuster cronAdjuster = new CronAdjuster(cronExpression);
final SchedulerRunnable runnable = () -> {
job.run(config);
};
final SchedulerRunnable runnable = () -> job.run(config);

if (cronAdjuster.isReboot()) {
return scheduler.at(runnable, Instant.ofEpochMilli(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
*/
package org.openhab.core.internal.scheduler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.time.temporal.TemporalAdjuster;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -52,10 +53,10 @@
public class SchedulerImpl implements Scheduler {

private static final String SCHEDULER_THREAD_POOL = "scheduler";
private static final int ALLOWED_DEVIATION_MILLISECONDS = 2000;

private final Logger logger = LoggerFactory.getLogger(SchedulerImpl.class);

private final Clock clock = Clock.systemDefaultZone();
private final ScheduledExecutorService executor = ThreadPoolManager.getScheduledPool(SCHEDULER_THREAD_POOL);

@Override
Expand All @@ -66,23 +67,37 @@ public ScheduledCompletableFuture<Instant> after(Duration duration) {

@Override
public <T> ScheduledCompletableFuture<T> after(Callable<T> callable, Duration duration) {
return afterInternal(new ScheduledCompletableFutureOnce<>(), callable, duration);
return afterInternal(new ScheduledCompletableFutureOnce<>(duration), callable);
}

private <T> ScheduledCompletableFutureOnce<T> afterInternal(ScheduledCompletableFutureOnce<T> deferred,
Callable<T> callable, Duration duration) {
Callable<T> callable) {
final long duration = Math.max(100,
deferred.getScheduledTime().minus(currentTimeMillis(), ChronoUnit.MILLIS).toInstant().toEpochMilli());
final ScheduledFuture<?> future = executor.schedule(() -> {
try {
deferred.complete(callable.call());
final long timeLeft = deferred.getDelay(TimeUnit.MILLISECONDS);

if (timeLeft > ALLOWED_DEVIATION_MILLISECONDS) {
logger.trace("Scheduled task is re-scheduled because the scheduler ran {} milliseconds to early.",
timeLeft);
afterInternal(deferred, callable);
} else {
logger.trace("Scheduled task is run now.");
deferred.complete(callable.call());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.warn("Scheduled job failed and stopped", e);
deferred.completeExceptionally(e);
}
}, Math.max(0, duration.toMillis()), TimeUnit.MILLISECONDS);
deferred.setInstant(duration);
}, duration, TimeUnit.MILLISECONDS);
if (logger.isTraceEnabled()) {
logger.trace("Scheduled a task to run in {} seconds.", TimeUnit.MILLISECONDS.toSeconds(duration));
}
deferred.exceptionally(e -> {
logger.trace("Scheduled task stopped with exception ", e);
if (e instanceof CancellationException) {
future.cancel(true);
}
Expand All @@ -99,12 +114,12 @@ public <T> ScheduledCompletableFuture<T> before(CompletableFuture<T> promise, Du
runnable.run();
}
};
final ScheduledCompletableFutureOnce<T> wrappedPromise = new ScheduledCompletableFutureOnce<>();
final ScheduledCompletableFutureOnce<T> wrappedPromise = new ScheduledCompletableFutureOnce<>(timeout);
Callable<T> callable = () -> {
wrappedPromise.completeExceptionally(new TimeoutException());
return null;
};
final ScheduledCompletableFutureOnce<T> afterPromise = afterInternal(wrappedPromise, callable, timeout);
final ScheduledCompletableFutureOnce<T> afterPromise = afterInternal(wrappedPromise, callable);
wrappedPromise.exceptionally(e -> {
if (e instanceof CancellationException) {
// Also cancel the scheduled timer if returned completable future is cancelled.
Expand All @@ -128,44 +143,47 @@ public ScheduledCompletableFuture<Instant> at(Instant instant) {

@Override
public <T> ScheduledCompletableFuture<T> at(Callable<T> callable, Instant instant) {
return atInternal(new ScheduledCompletableFutureOnce<>(), callable, instant);
return atInternal(
new ScheduledCompletableFutureOnce<>(ZonedDateTime.ofInstant(instant, ZoneId.systemDefault())),
callable);
}

private <T> ScheduledCompletableFuture<T> atInternal(ScheduledCompletableFutureOnce<T> deferred,
Callable<T> callable, Instant instant) {
final long delay = instant.toEpochMilli() - System.currentTimeMillis();

return afterInternal(deferred, callable, Duration.ofMillis(delay));
Callable<T> callable) {
return afterInternal(deferred, callable);
}

@Override
public <T> ScheduledCompletableFuture<T> schedule(SchedulerRunnable runnable, TemporalAdjuster temporalAdjuster) {
final ScheduledCompletableFutureRecurring<T> schedule = new ScheduledCompletableFutureRecurring<>();
final ScheduledCompletableFutureRecurring<T> schedule = new ScheduledCompletableFutureRecurring<>(
ZonedDateTime.now());

schedule(schedule, runnable, temporalAdjuster);
return schedule;
}

private <T> void schedule(ScheduledCompletableFutureRecurring<T> schedule, SchedulerRunnable runnable,
private <T> void schedule(ScheduledCompletableFutureRecurring<T> recurringSchedule, SchedulerRunnable runnable,
TemporalAdjuster temporalAdjuster) {
final Temporal newTime = ZonedDateTime.now(clock).with(temporalAdjuster);
final ScheduledCompletableFutureOnce<T> deferred = new ScheduledCompletableFutureOnce<>();
final Temporal newTime = recurringSchedule.getScheduledTime().with(temporalAdjuster);
final ScheduledCompletableFutureOnce<T> deferred = new ScheduledCompletableFutureOnce<>(
ZonedDateTime.from(newTime));

deferred.thenAccept(v -> {
if (temporalAdjuster instanceof SchedulerTemporalAdjuster) {
SchedulerTemporalAdjuster schedulerTemporalAdjuster = (SchedulerTemporalAdjuster) temporalAdjuster;
final SchedulerTemporalAdjuster schedulerTemporalAdjuster = (SchedulerTemporalAdjuster) temporalAdjuster;

if (!schedulerTemporalAdjuster.isDone(newTime)) {
schedule(schedule, runnable, temporalAdjuster);
schedule(recurringSchedule, runnable, temporalAdjuster);
return;
}
}
schedule.complete(v);
recurringSchedule.complete(v);
});
schedule.setScheduledPromise(deferred);
recurringSchedule.setScheduledPromise(deferred);
atInternal(deferred, () -> {
runnable.run();
return null;
}, Instant.from(newTime));
});
}

/**
Expand All @@ -178,7 +196,8 @@ private <T> void schedule(ScheduledCompletableFutureRecurring<T> schedule, Sched
private static class ScheduledCompletableFutureRecurring<T> extends ScheduledCompletableFutureOnce<T> {
private @Nullable volatile ScheduledCompletableFuture<T> scheduledPromise;

public ScheduledCompletableFutureRecurring() {
public ScheduledCompletableFutureRecurring(ZonedDateTime scheduledTime) {
super(scheduledTime);
exceptionally(e -> {
synchronized (this) {
if (e instanceof CancellationException) {
Expand Down Expand Up @@ -209,7 +228,12 @@ void setScheduledPromise(ScheduledCompletableFuture<T> future) {

@Override
public long getDelay(@Nullable TimeUnit timeUnit) {
return scheduledPromise == null ? 0 : scheduledPromise.getDelay(timeUnit);
return scheduledPromise != null ? scheduledPromise.getDelay(timeUnit) : 0;
}

@Override
public ZonedDateTime getScheduledTime() {
return scheduledPromise != null ? scheduledPromise.getScheduledTime() : super.getScheduledTime();
}
}

Expand All @@ -220,7 +244,15 @@ public long getDelay(@Nullable TimeUnit timeUnit) {
*/
private static class ScheduledCompletableFutureOnce<T> extends CompletableFuture<T>
implements ScheduledCompletableFuture<T> {
private @Nullable Instant instant;
private ZonedDateTime scheduledTime;

public ScheduledCompletableFutureOnce(Duration duration) {
this(ZonedDateTime.now().plusNanos(duration.toNanos()));
}

public ScheduledCompletableFutureOnce(ZonedDateTime scheduledTime) {
this.scheduledTime = scheduledTime;
}

@Override
public CompletableFuture<T> getPromise() {
Expand All @@ -229,21 +261,31 @@ public CompletableFuture<T> getPromise() {

@Override
public long getDelay(@Nullable TimeUnit timeUnit) {
if (timeUnit == null || instant == null) {
ZonedDateTime scheduledTime = this.scheduledTime;
if (timeUnit == null) {
return 0;
}
long remaining = instant.toEpochMilli() - System.currentTimeMillis();
long remaining = scheduledTime.toInstant().toEpochMilli() - System.currentTimeMillis();

return timeUnit.convert(remaining, TimeUnit.MILLISECONDS);
}

@Override
public int compareTo(@Nullable Delayed timeUnit) {
return Long.compare(getDelay(TimeUnit.MILLISECONDS), timeUnit.getDelay(TimeUnit.MILLISECONDS));
return timeUnit == null ? -1
: Long.compare(getDelay(TimeUnit.MILLISECONDS), timeUnit.getDelay(TimeUnit.MILLISECONDS));
}

protected void setInstant(Duration duration) {
this.instant = Instant.now().plusMillis(duration.toMillis());
@Override
public ZonedDateTime getScheduledTime() {
return scheduledTime;
}
}

/**
* Wraps the system call to get the current time to be able to manipulate it in a unit test.
*/
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static Collection<Object[]> arguments() {

@ParameterizedTest
@MethodSource("arguments")
@Timeout(value = 1, unit = TimeUnit.SECONDS)
@Timeout(value = 2, unit = TimeUnit.SECONDS)
public void testCronExpression(String in, String cron, String[] outs) {
final CronAdjuster cronAdjuster = new CronAdjuster(cron);
Temporal ldt = LocalDateTime.parse(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.chrono.ChronoZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.Temporal;
import java.util.concurrent.Callable;
Expand All @@ -29,6 +31,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -183,10 +186,11 @@ public void testAfterTimeoutException() throws Throwable {
@Timeout(value = 1, unit = TimeUnit.SECONDS)
public void testSchedule() throws InterruptedException {
Semaphore s = new Semaphore(0);
TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster();
TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter();
scheduler.schedule(s::release, temporalAdjuster);
s.acquire(3);
Thread.sleep(300); // wait a little longer to see if not more are scheduled.

assertEquals(0, s.availablePermits(), "Scheduler should not have released more after done");
assertEquals(3, temporalAdjuster.getCount(), "Scheduler should have run 3 times");
}
Expand All @@ -195,7 +199,7 @@ public void testSchedule() throws InterruptedException {
@Timeout(value = 1, unit = TimeUnit.SECONDS)
public void testScheduleCancel() throws InterruptedException {
Semaphore s = new Semaphore(0);
TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster();
TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter();
ScheduledCompletableFuture<Void> schedule = scheduler.schedule(s::release, temporalAdjuster);
s.acquire(1);
Thread.sleep(50);
Expand All @@ -209,7 +213,7 @@ public void testScheduleCancel() throws InterruptedException {
@Timeout(value = 1, unit = TimeUnit.SECONDS)
public void testScheduleException() throws InterruptedException {
Semaphore s = new Semaphore(0);
TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster();
TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter();
SchedulerRunnable runnable = () -> {
// Pass a exception not very likely thrown by the scheduler it self to avoid missing real exceptions.
throw new FileNotFoundException("testBeforeTimeoutException");
Expand Down Expand Up @@ -265,7 +269,35 @@ public void testCompareTo() throws InterruptedException {
future2.cancel(true);
}

private final class TestSchedulerTemporalAdjuster implements SchedulerTemporalAdjuster {
/**
* This tests if the reschedule works correctly.
* It does this by manipulating the duration calculation of the next step.
* This causes the scheduler to reschedule the next call to early.
* It then should match against the actual expected time and reschedule because the expected time is not reached.
*/
@Test
@Timeout(value = 15, unit = TimeUnit.SECONDS)
public void testEarlyTrigger() throws InterruptedException, ExecutionException {
final TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster(3000);
final AtomicInteger counter = new AtomicInteger();
final SchedulerImpl scheduler = new SchedulerImpl() {
@Override
protected long currentTimeMillis() {
// Add 3 seconds to let the duration calculation be too short.
// This modification does mean it knows a bit about the internal implementation.
return super.currentTimeMillis() + 3000;
}
};
final AtomicReference<ScheduledCompletableFuture<Object>> reference = new AtomicReference<>();
final ScheduledCompletableFuture<Object> future = scheduler.schedule(() -> counter.incrementAndGet(),
temporalAdjuster);
reference.set(future);
future.get();
assertEquals(3, temporalAdjuster.getCount(), "The next schedule caluclator should have been done 3 times.");
assertEquals(3, counter.get(), "The schedule run method should have been called 3 times.");
}

private static class TestSchedulerWithCounter implements SchedulerTemporalAdjuster {
private final AtomicInteger counter = new AtomicInteger();

@Override
Expand All @@ -283,4 +315,34 @@ public int getCount() {
return counter.get();
}
}

private static class TestSchedulerTemporalAdjuster extends TestSchedulerWithCounter {
private final ZonedDateTime startTime;
private final int duration;

public TestSchedulerTemporalAdjuster(int duration) {
this.duration = duration;
startTime = ZonedDateTime.now();
}

@Override
public Temporal adjustInto(Temporal arg0) {
Temporal now = arg0.plus(100, ChronoUnit.MILLIS);
for (int i = 0; i < 5; i++) {
ZonedDateTime newTime = startTime.plus(duration * i, ChronoUnit.MILLIS);

if (newTime.isAfter((ChronoZonedDateTime<?>) now)) {
return newTime;
}

}
throw new IllegalStateException("Test should always find time");
}

@Override
public boolean isDone(Temporal temporal) {
super.isDone(temporal);
return ZonedDateTime.now().compareTo(startTime.plus(duration * 3 - 2000, ChronoUnit.MILLIS)) > 0;
}
}
}

0 comments on commit 3851f5a

Please sign in to comment.