diff --git a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java index afbcf4ce..21219ee6 100644 --- a/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java +++ b/src/main/java/org/opensearch/jobscheduler/scheduler/JobScheduler.java @@ -119,13 +119,10 @@ public boolean deschedule(String indexName, String id) { jobInfo.setExpectedPreviousExecutionTime(null); Scheduler.ScheduledCancellable scheduledCancellable = jobInfo.getScheduledCancellable(); - if (scheduledCancellable != null) { - if (scheduledCancellable.cancel()) { - this.scheduledJobInfo.removeJob(indexName, id); - } else { - return false; - } + if (scheduledCancellable != null && !scheduledCancellable.cancel()) { + return false; } + this.scheduledJobInfo.removeJob(indexName, id); return true; } @@ -148,7 +145,18 @@ boolean reschedule( log.info("No next execution time for job {}", jobParameter.getName()); return true; } - Duration duration = Duration.between(this.clock.instant(), nextExecutionTime); + Instant now = this.clock.instant(); + Duration duration = Duration.between(now, nextExecutionTime); + if (duration.isNegative()) { + log.info( + "job {} expected time: {} < current time: {}, setting next execute time to current", + jobParameter.getName(), + nextExecutionTime.toEpochMilli(), + now.toEpochMilli() + ); + nextExecutionTime = now; + duration = Duration.ZERO; + } // Too many jobs start at the same time point will bring burst. Add random jitter delay to spread out load. // Example, if interval is 10 minutes, jitter is 0.6, next job run will be randomly delayed by 0 to 10*0.6 minutes. diff --git a/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java b/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java index 51d4c5f1..a6597b33 100644 --- a/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java +++ b/src/test/java/org/opensearch/jobscheduler/scheduler/JobSchedulerTests.java @@ -143,6 +143,33 @@ public void testReschedule_noEnableTime() { Assert.assertFalse(this.scheduler.reschedule(jobParameter, null, null, dummyVersion, jitterLimit)); } + public void testReschedule_outOfExpectTime() { + Schedule schedule = Mockito.mock(Schedule.class); + ScheduledJobParameter jobParameter = buildScheduledJobParameter( + "job-id", + "dummy job name", + Instant.now().minus(1, ChronoUnit.HOURS), + Instant.now(), + schedule, + false, + 0.6 + ); + JobSchedulingInfo jobSchedulingInfo = new JobSchedulingInfo("job-index", "job-id", jobParameter); + Instant now = Instant.now(); + jobSchedulingInfo.setDescheduled(false); + + Mockito.when(schedule.getNextExecutionTime(Mockito.any())) + .thenReturn(now.minus(10, ChronoUnit.MINUTES)) + .thenReturn(now.plus(2, ChronoUnit.MINUTES)); + + Scheduler.ScheduledCancellable cancellable = Mockito.mock(Scheduler.ScheduledCancellable.class); + Mockito.when(this.threadPool.schedule(Mockito.any(), Mockito.any(), Mockito.anyString())).thenReturn(cancellable); + + Assert.assertTrue(this.scheduler.reschedule(jobParameter, jobSchedulingInfo, null, dummyVersion, jitterLimit)); + Assert.assertEquals(cancellable, jobSchedulingInfo.getScheduledCancellable()); + Mockito.verify(this.threadPool).schedule(Mockito.any(), Mockito.any(), Mockito.anyString()); + } + public void testReschedule_jobDescheduled() { Schedule schedule = Mockito.mock(Schedule.class); ScheduledJobParameter jobParameter = buildScheduledJobParameter(