From 3851f5a156b8ac794fa446696e5fc916072cd079 Mon Sep 17 00:00:00 2001 From: Hilbrand Bouwkamp Date: Sat, 27 Feb 2021 11:21:10 +0100 Subject: [PATCH] [scheduler] Mitigation fix for earlier triggers #1976 (#2190) Some users reported the scheduler to be run to earlier. It actual is known the Java scheduler can drift. To handle this problem 2 changes have been made: 1. If the scheduler triggers > 2 sec before expected end time it will reschedule. 2. The scheduler keeps track of the timestamp the job should run. For recurring schedulers, like cron, when the next job is run it offsets of the calculated time, and not the actual time. This guaranties that the next scheduled time is actual the next scheduled time and not the same end time in case the scheduler would trigger to early. Signed-off-by: Hilbrand Bouwkamp GitOrigin-RevId: 7fef42c6c8c817cc419efe93a1f19a15cbee634a --- .../scheduler/ScheduledCompletableFuture.java | 6 + .../internal/scheduler/CronSchedulerImpl.java | 4 +- .../internal/scheduler/SchedulerImpl.java | 104 ++++++++++++------ .../internal/scheduler/CronAdjusterTest.java | 2 +- .../internal/scheduler/SchedulerImplTest.java | 70 +++++++++++- 5 files changed, 147 insertions(+), 39 deletions(-) diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/scheduler/ScheduledCompletableFuture.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/scheduler/ScheduledCompletableFuture.java index 706ca5cd2b3..10ff7ecc950 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/scheduler/ScheduledCompletableFuture.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/scheduler/ScheduledCompletableFuture.java @@ -12,6 +12,7 @@ */ package org.openhab.core.scheduler; +import java.time.ZonedDateTime; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledFuture; @@ -29,4 +30,9 @@ public interface ScheduledCompletableFuture extends ScheduledFuture { * @return Returns the {@link CompletableFuture} associated with the scheduled job. */ CompletableFuture getPromise(); + + /** + * @return Returns the timestamp the jobs is scheduled to run at. + */ + ZonedDateTime getScheduledTime(); } diff --git a/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/CronSchedulerImpl.java b/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/CronSchedulerImpl.java index 8e97eba0484..dd435cff178 100644 --- a/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/CronSchedulerImpl.java +++ b/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/CronSchedulerImpl.java @@ -65,9 +65,7 @@ public CronSchedulerImpl(final @Reference Scheduler scheduler) { public ScheduledCompletableFuture<@Nullable Void> schedule(CronJob job, Map config, String cronExpression) { final CronAdjuster cronAdjuster = new CronAdjuster(cronExpression); - final SchedulerRunnable runnable = () -> { - job.run(config); - }; + final SchedulerRunnable runnable = () -> job.run(config); if (cronAdjuster.isReboot()) { return scheduler.at(runnable, Instant.ofEpochMilli(1)); diff --git a/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/SchedulerImpl.java b/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/SchedulerImpl.java index af5094e3c4b..47c03ed3910 100644 --- a/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/SchedulerImpl.java +++ b/bundles/org.opensmarthouse.core.scheduler.core/src/main/java/org/openhab/core/internal/scheduler/SchedulerImpl.java @@ -12,10 +12,11 @@ */ package org.openhab.core.internal.scheduler; -import java.time.Clock; import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; +import java.time.temporal.ChronoUnit; import java.time.temporal.Temporal; import java.time.temporal.TemporalAdjuster; import java.util.concurrent.Callable; @@ -52,10 +53,10 @@ public class SchedulerImpl implements Scheduler { private static final String SCHEDULER_THREAD_POOL = "scheduler"; + private static final int ALLOWED_DEVIATION_MILLISECONDS = 2000; private final Logger logger = LoggerFactory.getLogger(SchedulerImpl.class); - private final Clock clock = Clock.systemDefaultZone(); private final ScheduledExecutorService executor = ThreadPoolManager.getScheduledPool(SCHEDULER_THREAD_POOL); @Override @@ -66,23 +67,37 @@ public ScheduledCompletableFuture after(Duration duration) { @Override public ScheduledCompletableFuture after(Callable callable, Duration duration) { - return afterInternal(new ScheduledCompletableFutureOnce<>(), callable, duration); + return afterInternal(new ScheduledCompletableFutureOnce<>(duration), callable); } private ScheduledCompletableFutureOnce afterInternal(ScheduledCompletableFutureOnce deferred, - Callable callable, Duration duration) { + Callable callable) { + final long duration = Math.max(100, + deferred.getScheduledTime().minus(currentTimeMillis(), ChronoUnit.MILLIS).toInstant().toEpochMilli()); final ScheduledFuture future = executor.schedule(() -> { try { - deferred.complete(callable.call()); + final long timeLeft = deferred.getDelay(TimeUnit.MILLISECONDS); + + if (timeLeft > ALLOWED_DEVIATION_MILLISECONDS) { + logger.trace("Scheduled task is re-scheduled because the scheduler ran {} milliseconds to early.", + timeLeft); + afterInternal(deferred, callable); + } else { + logger.trace("Scheduled task is run now."); + deferred.complete(callable.call()); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { logger.warn("Scheduled job failed and stopped", e); deferred.completeExceptionally(e); } - }, Math.max(0, duration.toMillis()), TimeUnit.MILLISECONDS); - deferred.setInstant(duration); + }, duration, TimeUnit.MILLISECONDS); + if (logger.isTraceEnabled()) { + logger.trace("Scheduled a task to run in {} seconds.", TimeUnit.MILLISECONDS.toSeconds(duration)); + } deferred.exceptionally(e -> { + logger.trace("Scheduled task stopped with exception ", e); if (e instanceof CancellationException) { future.cancel(true); } @@ -99,12 +114,12 @@ public ScheduledCompletableFuture before(CompletableFuture promise, Du runnable.run(); } }; - final ScheduledCompletableFutureOnce wrappedPromise = new ScheduledCompletableFutureOnce<>(); + final ScheduledCompletableFutureOnce wrappedPromise = new ScheduledCompletableFutureOnce<>(timeout); Callable callable = () -> { wrappedPromise.completeExceptionally(new TimeoutException()); return null; }; - final ScheduledCompletableFutureOnce afterPromise = afterInternal(wrappedPromise, callable, timeout); + final ScheduledCompletableFutureOnce afterPromise = afterInternal(wrappedPromise, callable); wrappedPromise.exceptionally(e -> { if (e instanceof CancellationException) { // Also cancel the scheduled timer if returned completable future is cancelled. @@ -128,44 +143,47 @@ public ScheduledCompletableFuture at(Instant instant) { @Override public ScheduledCompletableFuture at(Callable callable, Instant instant) { - return atInternal(new ScheduledCompletableFutureOnce<>(), callable, instant); + return atInternal( + new ScheduledCompletableFutureOnce<>(ZonedDateTime.ofInstant(instant, ZoneId.systemDefault())), + callable); } private ScheduledCompletableFuture atInternal(ScheduledCompletableFutureOnce deferred, - Callable callable, Instant instant) { - final long delay = instant.toEpochMilli() - System.currentTimeMillis(); - - return afterInternal(deferred, callable, Duration.ofMillis(delay)); + Callable callable) { + return afterInternal(deferred, callable); } @Override public ScheduledCompletableFuture schedule(SchedulerRunnable runnable, TemporalAdjuster temporalAdjuster) { - final ScheduledCompletableFutureRecurring schedule = new ScheduledCompletableFutureRecurring<>(); + final ScheduledCompletableFutureRecurring schedule = new ScheduledCompletableFutureRecurring<>( + ZonedDateTime.now()); schedule(schedule, runnable, temporalAdjuster); return schedule; } - private void schedule(ScheduledCompletableFutureRecurring schedule, SchedulerRunnable runnable, + private void schedule(ScheduledCompletableFutureRecurring recurringSchedule, SchedulerRunnable runnable, TemporalAdjuster temporalAdjuster) { - final Temporal newTime = ZonedDateTime.now(clock).with(temporalAdjuster); - final ScheduledCompletableFutureOnce deferred = new ScheduledCompletableFutureOnce<>(); + final Temporal newTime = recurringSchedule.getScheduledTime().with(temporalAdjuster); + final ScheduledCompletableFutureOnce deferred = new ScheduledCompletableFutureOnce<>( + ZonedDateTime.from(newTime)); deferred.thenAccept(v -> { if (temporalAdjuster instanceof SchedulerTemporalAdjuster) { - SchedulerTemporalAdjuster schedulerTemporalAdjuster = (SchedulerTemporalAdjuster) temporalAdjuster; + final SchedulerTemporalAdjuster schedulerTemporalAdjuster = (SchedulerTemporalAdjuster) temporalAdjuster; + if (!schedulerTemporalAdjuster.isDone(newTime)) { - schedule(schedule, runnable, temporalAdjuster); + schedule(recurringSchedule, runnable, temporalAdjuster); return; } } - schedule.complete(v); + recurringSchedule.complete(v); }); - schedule.setScheduledPromise(deferred); + recurringSchedule.setScheduledPromise(deferred); atInternal(deferred, () -> { runnable.run(); return null; - }, Instant.from(newTime)); + }); } /** @@ -178,7 +196,8 @@ private void schedule(ScheduledCompletableFutureRecurring schedule, Sched private static class ScheduledCompletableFutureRecurring extends ScheduledCompletableFutureOnce { private @Nullable volatile ScheduledCompletableFuture scheduledPromise; - public ScheduledCompletableFutureRecurring() { + public ScheduledCompletableFutureRecurring(ZonedDateTime scheduledTime) { + super(scheduledTime); exceptionally(e -> { synchronized (this) { if (e instanceof CancellationException) { @@ -209,7 +228,12 @@ void setScheduledPromise(ScheduledCompletableFuture future) { @Override public long getDelay(@Nullable TimeUnit timeUnit) { - return scheduledPromise == null ? 0 : scheduledPromise.getDelay(timeUnit); + return scheduledPromise != null ? scheduledPromise.getDelay(timeUnit) : 0; + } + + @Override + public ZonedDateTime getScheduledTime() { + return scheduledPromise != null ? scheduledPromise.getScheduledTime() : super.getScheduledTime(); } } @@ -220,7 +244,15 @@ public long getDelay(@Nullable TimeUnit timeUnit) { */ private static class ScheduledCompletableFutureOnce extends CompletableFuture implements ScheduledCompletableFuture { - private @Nullable Instant instant; + private ZonedDateTime scheduledTime; + + public ScheduledCompletableFutureOnce(Duration duration) { + this(ZonedDateTime.now().plusNanos(duration.toNanos())); + } + + public ScheduledCompletableFutureOnce(ZonedDateTime scheduledTime) { + this.scheduledTime = scheduledTime; + } @Override public CompletableFuture getPromise() { @@ -229,21 +261,31 @@ public CompletableFuture getPromise() { @Override public long getDelay(@Nullable TimeUnit timeUnit) { - if (timeUnit == null || instant == null) { + ZonedDateTime scheduledTime = this.scheduledTime; + if (timeUnit == null) { return 0; } - long remaining = instant.toEpochMilli() - System.currentTimeMillis(); + long remaining = scheduledTime.toInstant().toEpochMilli() - System.currentTimeMillis(); return timeUnit.convert(remaining, TimeUnit.MILLISECONDS); } @Override public int compareTo(@Nullable Delayed timeUnit) { - return Long.compare(getDelay(TimeUnit.MILLISECONDS), timeUnit.getDelay(TimeUnit.MILLISECONDS)); + return timeUnit == null ? -1 + : Long.compare(getDelay(TimeUnit.MILLISECONDS), timeUnit.getDelay(TimeUnit.MILLISECONDS)); } - protected void setInstant(Duration duration) { - this.instant = Instant.now().plusMillis(duration.toMillis()); + @Override + public ZonedDateTime getScheduledTime() { + return scheduledTime; } } + + /** + * Wraps the system call to get the current time to be able to manipulate it in a unit test. + */ + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } } diff --git a/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/CronAdjusterTest.java b/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/CronAdjusterTest.java index e0cb6a705fa..0c871a61b01 100644 --- a/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/CronAdjusterTest.java +++ b/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/CronAdjusterTest.java @@ -183,7 +183,7 @@ public static Collection arguments() { @ParameterizedTest @MethodSource("arguments") - @Timeout(value = 1, unit = TimeUnit.SECONDS) + @Timeout(value = 2, unit = TimeUnit.SECONDS) public void testCronExpression(String in, String cron, String[] outs) { final CronAdjuster cronAdjuster = new CronAdjuster(cron); Temporal ldt = LocalDateTime.parse(in); diff --git a/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/SchedulerImplTest.java b/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/SchedulerImplTest.java index 7c2202f86ac..cd541f89a99 100644 --- a/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/SchedulerImplTest.java +++ b/bundles/org.opensmarthouse.core.scheduler.core/src/test/java/org/openhab/core/internal/scheduler/SchedulerImplTest.java @@ -18,6 +18,8 @@ import java.lang.reflect.InvocationTargetException; import java.time.Duration; import java.time.Instant; +import java.time.ZonedDateTime; +import java.time.chrono.ChronoZonedDateTime; import java.time.temporal.ChronoUnit; import java.time.temporal.Temporal; import java.util.concurrent.Callable; @@ -29,6 +31,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -183,10 +186,11 @@ public void testAfterTimeoutException() throws Throwable { @Timeout(value = 1, unit = TimeUnit.SECONDS) public void testSchedule() throws InterruptedException { Semaphore s = new Semaphore(0); - TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster(); + TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter(); scheduler.schedule(s::release, temporalAdjuster); s.acquire(3); Thread.sleep(300); // wait a little longer to see if not more are scheduled. + assertEquals(0, s.availablePermits(), "Scheduler should not have released more after done"); assertEquals(3, temporalAdjuster.getCount(), "Scheduler should have run 3 times"); } @@ -195,7 +199,7 @@ public void testSchedule() throws InterruptedException { @Timeout(value = 1, unit = TimeUnit.SECONDS) public void testScheduleCancel() throws InterruptedException { Semaphore s = new Semaphore(0); - TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster(); + TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter(); ScheduledCompletableFuture schedule = scheduler.schedule(s::release, temporalAdjuster); s.acquire(1); Thread.sleep(50); @@ -209,7 +213,7 @@ public void testScheduleCancel() throws InterruptedException { @Timeout(value = 1, unit = TimeUnit.SECONDS) public void testScheduleException() throws InterruptedException { Semaphore s = new Semaphore(0); - TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster(); + TestSchedulerWithCounter temporalAdjuster = new TestSchedulerWithCounter(); SchedulerRunnable runnable = () -> { // Pass a exception not very likely thrown by the scheduler it self to avoid missing real exceptions. throw new FileNotFoundException("testBeforeTimeoutException"); @@ -265,7 +269,35 @@ public void testCompareTo() throws InterruptedException { future2.cancel(true); } - private final class TestSchedulerTemporalAdjuster implements SchedulerTemporalAdjuster { + /** + * This tests if the reschedule works correctly. + * It does this by manipulating the duration calculation of the next step. + * This causes the scheduler to reschedule the next call to early. + * It then should match against the actual expected time and reschedule because the expected time is not reached. + */ + @Test + @Timeout(value = 15, unit = TimeUnit.SECONDS) + public void testEarlyTrigger() throws InterruptedException, ExecutionException { + final TestSchedulerTemporalAdjuster temporalAdjuster = new TestSchedulerTemporalAdjuster(3000); + final AtomicInteger counter = new AtomicInteger(); + final SchedulerImpl scheduler = new SchedulerImpl() { + @Override + protected long currentTimeMillis() { + // Add 3 seconds to let the duration calculation be too short. + // This modification does mean it knows a bit about the internal implementation. + return super.currentTimeMillis() + 3000; + } + }; + final AtomicReference> reference = new AtomicReference<>(); + final ScheduledCompletableFuture future = scheduler.schedule(() -> counter.incrementAndGet(), + temporalAdjuster); + reference.set(future); + future.get(); + assertEquals(3, temporalAdjuster.getCount(), "The next schedule caluclator should have been done 3 times."); + assertEquals(3, counter.get(), "The schedule run method should have been called 3 times."); + } + + private static class TestSchedulerWithCounter implements SchedulerTemporalAdjuster { private final AtomicInteger counter = new AtomicInteger(); @Override @@ -283,4 +315,34 @@ public int getCount() { return counter.get(); } } + + private static class TestSchedulerTemporalAdjuster extends TestSchedulerWithCounter { + private final ZonedDateTime startTime; + private final int duration; + + public TestSchedulerTemporalAdjuster(int duration) { + this.duration = duration; + startTime = ZonedDateTime.now(); + } + + @Override + public Temporal adjustInto(Temporal arg0) { + Temporal now = arg0.plus(100, ChronoUnit.MILLIS); + for (int i = 0; i < 5; i++) { + ZonedDateTime newTime = startTime.plus(duration * i, ChronoUnit.MILLIS); + + if (newTime.isAfter((ChronoZonedDateTime) now)) { + return newTime; + } + + } + throw new IllegalStateException("Test should always find time"); + } + + @Override + public boolean isDone(Temporal temporal) { + super.isDone(temporal); + return ZonedDateTime.now().compareTo(startTime.plus(duration * 3 - 2000, ChronoUnit.MILLIS)) > 0; + } + } }