Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interval schedule should take start time from the request, should not… #1040

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ data class Rollup(
// TODO: Make startTime public in Job Scheduler so we can just directly check the value
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
schedule = IntervalSchedule(schedule.startTime ?: Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bowenlan-amzn I will remove this null check. It is not necessary. And we did not break anything. The schedule is parsed earlier in this method on line 286 , and in this line the NPE is thrown if schedule.startTime == null(https://github.com/opensearch-project/index-management/blob/59cf8dd54b9617f0bc161a0e9b653529442f8e70/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt#L286C1-L286C32)

I checked it. I removed my changes and sent transform with schedule.start_time : null. Got 400 : numeric field may not be null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks, previously I thought this schedule field is optional, but turns out this is a required field, and schedule startTime is also required, which means user has to provide some long values here anyway. So I guess this has been a bad user experience till now, and we are just making it more reasonable.

}
}
return Rollup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ data class Transform(
if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
// we instantiate the start time
if (schedule is IntervalSchedule) {
schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit)
schedule = IntervalSchedule(schedule.startTime ?: Instant.now(), schedule.interval, schedule.unit)
Copy link
Contributor Author

@ikibo ikibo Nov 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bowenlan-amzn I will remove this null check, the same story as with rollup.

}

// we clear out metadata if its a new job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,40 +663,24 @@ class RollupRunnerIT : RollupRestTestCase() {
putDateDocumentInSourceIndex(rollup)

// Create rollup job
rollup = createRollup(rollup = rollup, rollupId = rollup.id)
val jobStartTime = Instant.now()
val rollupNow = rollup.copy(
jobSchedule = IntervalSchedule(jobStartTime, 1, ChronoUnit.MINUTES),
ikibo marked this conversation as resolved.
Show resolved Hide resolved
jobEnabledTime = jobStartTime
)
rollup = createRollup(rollup = rollupNow, rollupId = rollupNow.id)

var nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
val expectedExecutionTime = rollup.jobEnabledTime!!.plusMillis(delay).toEpochMilli()
val delayIsCorrect = ((expectedExecutionTime - nextExecutionTime) > -500) && ((expectedExecutionTime - nextExecutionTime) < 500)
assertTrue("Delay was not correctly applied", delayIsCorrect)
val nextExecutionTime1 = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli()
ikibo marked this conversation as resolved.
Show resolved Hide resolved
assertTrue("The first job execution time should be after [job start time] + [delay].", nextExecutionTime1 >= jobStartTime.toEpochMilli() + delay)
assertTrue("The first job execution time should not be delayed too much after [job start time] + [delay].", nextExecutionTime1 <= jobStartTime.toEpochMilli() + delay + 100)
ikibo marked this conversation as resolved.
Show resolved Hide resolved

waitFor {
// Wait until half a second before the intended execution time
assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime - 500)
// Still should not have run at this point
assertFalse("Target rollup index was created before the delay should allow", indexExists(rollup.targetIndex))
assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime1)
ikibo marked this conversation as resolved.
Show resolved Hide resolved
}
val rollupMetadata = waitFor {
assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex))
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertNotNull("Rollup metadata not found", rollupMetadata)
rollupMetadata
}
nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
val nextExecutionOffset = (nextExecutionTime - Instant.now().toEpochMilli()) - 60000
val nextExecutionIsCorrect = nextExecutionOffset < 5000 && nextExecutionOffset > -5000
assertTrue("Next execution time not updated correctly", nextExecutionIsCorrect)
val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime
val nextWindowEndTime: Instant = rollupMetadata.continuous!!.nextWindowEndTime
// Assert that after the window was updated, it falls approximately around 'now'
assertTrue("Rollup window start time is incorrect", nextWindowStartTime.plusMillis(delay).minusMillis(1000) < Instant.now())
assertTrue("Rollup window end time is incorrect", nextWindowEndTime.plusMillis(delay).plusMillis(1000) > Instant.now())

// window length should be 5 seconds
val expectedWindowEnd = nextWindowStartTime.plusMillis(5000)
assertEquals("Rollup window length applied incorrectly", expectedWindowEnd, nextWindowEndTime)

val nextExecutionTime2 = rollup.schedule.getNextExecutionTime(null).toEpochMilli()
assertTrue("The second job execution time should be not earlier than a minute after the first execution.", nextExecutionTime2 - nextExecutionTime1 >= 60000)
assertTrue("The second job execution time should not be too delayed after the first execution.", nextExecutionTime2 - nextExecutionTime1 <= 60000 + 100)
}

fun `test non continuous delay does nothing`() {
Expand Down
Loading