From e973b8177539e2dc925c571941c2eee9d4147672 Mon Sep 17 00:00:00 2001 From: Oleg Kravchuk Date: Tue, 14 Nov 2023 21:05:38 +0200 Subject: [PATCH 1/6] Interval schedule should take start time from the request, should not set it to the current time of request execution. Signed-off-by: Oleg Kravchuk --- .../org/opensearch/indexmanagement/rollup/model/Rollup.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index c45c1d71b..9cf07f9b8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -330,7 +330,7 @@ data class Rollup( // TODO: Make startTime public in Job Scheduler so we can just directly check the value if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0) + schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit, schedule.delay ?: 0) } } return Rollup( From d7e0a1ad1320666fa44bc2b3d1dd9f71c8043fbe Mon Sep 17 00:00:00 2001 From: Oleg Kravchuk Date: Wed, 22 Nov 2023 01:15:55 +0200 Subject: [PATCH 2/6] Changed the "delayed continuous execution test" to be more expressive about what it should test. Signed-off-by: Oleg Kravchuk --- .../rollup/runner/RollupRunnerIT.kt | 44 ++++++------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 3b173515c..d76f1a40f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -663,40 +663,24 @@ class RollupRunnerIT : RollupRestTestCase() { putDateDocumentInSourceIndex(rollup) // Create rollup job - rollup = createRollup(rollup = rollup, rollupId = rollup.id) + val jobStartTime = Instant.now() + val rollupNow = rollup.copy( + jobSchedule = IntervalSchedule(jobStartTime, 1, ChronoUnit.MINUTES), + jobEnabledTime = jobStartTime + ) + rollup = createRollup(rollup = rollupNow, rollupId = rollupNow.id) - var nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli() - val expectedExecutionTime = rollup.jobEnabledTime!!.plusMillis(delay).toEpochMilli() - val delayIsCorrect = ((expectedExecutionTime - nextExecutionTime) > -500) && ((expectedExecutionTime - nextExecutionTime) < 500) - assertTrue("Delay was not correctly applied", delayIsCorrect) + val nextExecutionTime1 = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli() + assertTrue("The first job execution time should be after [job start time] + [delay].", nextExecutionTime1 >= jobStartTime.toEpochMilli() + delay) + assertTrue("The first job execution time should not be delayed too much after [job start time] + [delay].", nextExecutionTime1 <= jobStartTime.toEpochMilli() + delay + 100) waitFor { - // Wait until half a second before the intended execution time - assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime - 500) - // Still should not have run at this point - assertFalse("Target rollup index was created before the delay should allow", indexExists(rollup.targetIndex)) + assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime1) } - val rollupMetadata = waitFor { - assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) - val rollupJob = getRollup(rollupId = rollup.id) - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertNotNull("Rollup metadata not found", rollupMetadata) - rollupMetadata - } - nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli() - val nextExecutionOffset = (nextExecutionTime - Instant.now().toEpochMilli()) - 60000 - val nextExecutionIsCorrect = nextExecutionOffset < 5000 && nextExecutionOffset > -5000 - assertTrue("Next execution time not updated correctly", nextExecutionIsCorrect) - val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime - val nextWindowEndTime: Instant = rollupMetadata.continuous!!.nextWindowEndTime - // Assert that after the window was updated, it falls approximately around 'now' - assertTrue("Rollup window start time is incorrect", nextWindowStartTime.plusMillis(delay).minusMillis(1000) < Instant.now()) - assertTrue("Rollup window end time is incorrect", nextWindowEndTime.plusMillis(delay).plusMillis(1000) > Instant.now()) - - // window length should be 5 seconds - val expectedWindowEnd = nextWindowStartTime.plusMillis(5000) - assertEquals("Rollup window length applied incorrectly", expectedWindowEnd, nextWindowEndTime) + + val nextExecutionTime2 = rollup.schedule.getNextExecutionTime(null).toEpochMilli() + assertTrue("The second job execution time should be not earlier than a minute after the first execution.", nextExecutionTime2 - nextExecutionTime1 >= 60000) + assertTrue("The second job execution time should not be too delayed after the first execution.", nextExecutionTime2 - nextExecutionTime1 <= 60000 + 100) } fun `test non continuous delay does nothing`() { From d5a97d91fe0d37da7dd41d7c93eef486f8ae1167 Mon Sep 17 00:00:00 2001 From: Oleg Kravchuk Date: Wed, 22 Nov 2023 02:18:11 +0200 Subject: [PATCH 3/6] fixed the NPE if schedule.startTime is NULL Signed-off-by: Oleg Kravchuk --- .../org/opensearch/indexmanagement/rollup/model/Rollup.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index 9cf07f9b8..c46a00b36 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -330,7 +330,7 @@ data class Rollup( // TODO: Make startTime public in Job Scheduler so we can just directly check the value if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit, schedule.delay ?: 0) + schedule = IntervalSchedule(schedule.startTime?: Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0) } } return Rollup( From 876e1b697840374d0e95662a5811a11a03407173 Mon Sep 17 00:00:00 2001 From: Oleg Kravchuk Date: Wed, 22 Nov 2023 02:20:52 +0200 Subject: [PATCH 4/6] fixed the NPE if schedule.startTime is NULL Signed-off-by: Oleg Kravchuk --- .../org/opensearch/indexmanagement/transform/model/Transform.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt index db3145e08..a0100e989 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt @@ -387,7 +387,7 @@ data class Transform( if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { // we instantiate the start time if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit) + schedule = IntervalSchedule(schedule.startTime?: Instant.now(), schedule.interval, schedule.unit) } // we clear out metadata if its a new job From eb190b96b1a06b75a10827c49eb1814defd6b64e Mon Sep 17 00:00:00 2001 From: Oleg Kravchuk Date: Wed, 22 Nov 2023 02:27:47 +0200 Subject: [PATCH 5/6] fixed styling Signed-off-by: Oleg Kravchuk --- .../org/opensearch/indexmanagement/rollup/model/Rollup.kt | 2 +- .../org/opensearch/indexmanagement/transform/model/Transform.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index c46a00b36..72c91cfaa 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -330,7 +330,7 @@ data class Rollup( // TODO: Make startTime public in Job Scheduler so we can just directly check the value if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(schedule.startTime?: Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0) + schedule = IntervalSchedule(schedule.startTime ?: Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0) } } return Rollup( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt index a0100e989..3e3151e18 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt @@ -387,7 +387,7 @@ data class Transform( if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { // we instantiate the start time if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(schedule.startTime?: Instant.now(), schedule.interval, schedule.unit) + schedule = IntervalSchedule(schedule.startTime ?: Instant.now(), schedule.interval, schedule.unit) } // we clear out metadata if its a new job From 4758e480c9c94d30d3b2956372521d3f0bfad7b0 Mon Sep 17 00:00:00 2001 From: Oleg Kravchuk Date: Wed, 22 Nov 2023 15:08:06 +0200 Subject: [PATCH 6/6] - removed null checks from RollUp and Transforms - fixed comments in the "delayed execution" test Signed-off-by: Oleg Kravchuk --- .../indexmanagement/rollup/model/Rollup.kt | 2 +- .../transform/model/Transform.kt | 2 +- .../rollup/runner/RollupRunnerIT.kt | 23 +++++++++++-------- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index 72c91cfaa..9cf07f9b8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -330,7 +330,7 @@ data class Rollup( // TODO: Make startTime public in Job Scheduler so we can just directly check the value if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(schedule.startTime ?: Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0) + schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit, schedule.delay ?: 0) } } return Rollup( diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt index 3e3151e18..db3145e08 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/model/Transform.kt @@ -387,7 +387,7 @@ data class Transform( if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { // we instantiate the start time if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(schedule.startTime ?: Instant.now(), schedule.interval, schedule.unit) + schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit) } // we clear out metadata if its a new job diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index d76f1a40f..73e1ca97b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -639,7 +639,7 @@ class RollupRunnerIT : RollupRestTestCase() { // Tests that a continuous rollup will not be processed until the end of the interval plus delay passes fun `test delaying continuous execution`() { val indexName = "test_index_runner_eighth" - val delay: Long = 15000 + val delay: Long = 7_500 // Define rollup var rollup = randomRollup().copy( id = "$testName-4", @@ -670,17 +670,22 @@ class RollupRunnerIT : RollupRestTestCase() { ) rollup = createRollup(rollup = rollupNow, rollupId = rollupNow.id) - val nextExecutionTime1 = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli() - assertTrue("The first job execution time should be after [job start time] + [delay].", nextExecutionTime1 >= jobStartTime.toEpochMilli() + delay) - assertTrue("The first job execution time should not be delayed too much after [job start time] + [delay].", nextExecutionTime1 <= jobStartTime.toEpochMilli() + delay + 100) + val expectedFirstExecutionTime = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli() + assertTrue("The first job execution time should be equal [job start time] + [delay].", expectedFirstExecutionTime == jobStartTime.toEpochMilli() + delay) - waitFor { - assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime1) + waitFor() { + assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertNotNull("Rollup metadata not found", rollupMetadata) } - val nextExecutionTime2 = rollup.schedule.getNextExecutionTime(null).toEpochMilli() - assertTrue("The second job execution time should be not earlier than a minute after the first execution.", nextExecutionTime2 - nextExecutionTime1 >= 60000) - assertTrue("The second job execution time should not be too delayed after the first execution.", nextExecutionTime2 - nextExecutionTime1 <= 60000 + 100) + val now = Instant.now().toEpochMilli() + assertTrue("The first job execution must happen after [job start time] + [delay]", now > jobStartTime.toEpochMilli() + delay) + + val secondExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli() + assertTrue("The second job execution time should be not earlier than a minute after the first execution.", secondExecutionTime - expectedFirstExecutionTime == 60_000L) } fun `test non continuous delay does nothing`() {