Skip to content

Commit

Permalink
Update sweep logic to re-schedule unchanged jobs when SWEEPER_ENABLED…
Browse files Browse the repository at this point in the history
… is toggled (#243)

* Update sweep logic to re-schedule unchanged jobs when SWEEPER_ENABLED is toggled

Signed-off-by: Mohammad Qureshi <[email protected]>

* Update isMonitorScheduled to not use var for returned boolean

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Dec 13, 2021
1 parent cc87105 commit ea7d526
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return getMonitor(monitorId = monitorId)
}

@Suppress("UNCHECKED_CAST")
protected fun updateMonitor(monitor: Monitor, refresh: Boolean = false): Monitor {
val response = client().makeRequest(
"PUT", "${monitor.relativeUrl()}?refresh=$refresh",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class MonitorRestApiIT : AlertingRestTestCase() {
}
}

/* Enable this test case after issue issue#269 is fixed.
/* Enable this test case after checking for disallowed destination during Monitor creation is added in
fun `test creating a monitor with a disallowed destination type fails`() {
try {
// Create a Chime Destination
Expand All @@ -284,7 +284,8 @@ class MonitorRestApiIT : AlertingRestTestCase() {
chime = chime,
slack = null,
customWebhook = null,
email = null)
email = null
)
val chimeDestination = createDestination(destination = destination)
// Remove Chime from the allow_list
Expand All @@ -293,12 +294,13 @@ class MonitorRestApiIT : AlertingRestTestCase() {
.joinToString(prefix = "[", postfix = "]") { string -> "\"$string\"" }
client().updateSettings(DestinationSettings.ALLOW_LIST.key, allowedDestinations)
createMonitor(randomMonitor(triggers = listOf(randomTrigger(destinationId = chimeDestination.id))))
createMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(destinationId = chimeDestination.id))))
fail("Expected 403 Method FORBIDDEN response")
} catch (e: ResponseException) {
assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus())
}
}*/
}
*/

@Throws(Exception::class)
fun `test updating search for a monitor`() {
Expand Down Expand Up @@ -887,23 +889,62 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Scheduled job is not enabled", false, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"])
val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() {
// Enable Monitor jobs
enableScheduledJob()
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id

var alertingStats = getAlertingStats()
assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"])
assertEquals("Scheduled job index is not yellow", "yellow", alertingStats["scheduled_job_index_status"])
assertEquals("Nodes are not on schedule", numberOfNodes, alertingStats["nodes_on_schedule"])

val _nodes = alertingStats["_nodes"] as Map<String, Int>
validateAlertingStatsNodeResponse(_nodes)

assertTrue(
"Monitor [$monitorId] was not found scheduled based on the alerting stats response: $alertingStats",
isMonitorScheduled(monitorId, alertingStats)
)

// Disable Monitor jobs
disableScheduledJob()

alertingStats = getAlertingStats()
assertEquals("Scheduled job is still enabled", false, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertFalse(
"Monitor [$monitorId] was still scheduled based on the alerting stats response: $alertingStats",
isMonitorScheduled(monitorId, alertingStats)
)

// Re-enable Monitor jobs
enableScheduledJob()

// Sleep briefly so sweep can reschedule the Monitor
Thread.sleep(2000)

alertingStats = getAlertingStats()
assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertTrue(
"Monitor [$monitorId] was not re-scheduled based on the alerting stats response: $alertingStats",
isMonitorScheduled(monitorId, alertingStats)
)
}

fun `test monitor stats no jobs`() {
// Disable the Monitor plugin.
// Enable the Monitor plugin.
enableScheduledJob()

val responseMap = getAlertingStats()
// assertEquals("Cluster name is incorrect", responseMap["cluster_name"], "alerting_integTestCluster")
assertEquals("Scheduled job is not enabled", true, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key])
assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"])
val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

fun `test monitor stats jobs`() {
Expand All @@ -919,9 +960,7 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Nodes are not on schedule", numberOfNodes, responseMap["nodes_on_schedule"])

val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

@Throws(Exception::class)
Expand Down Expand Up @@ -950,9 +989,7 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Nodes not on schedule", numberOfNodes, responseMap["nodes_on_schedule"])

val _nodes = responseMap["_nodes"] as Map<String, Int>
assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"])
assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"])
validateAlertingStatsNodeResponse(_nodes)
}

fun `test monitor stats incorrect metric`() {
Expand Down Expand Up @@ -1045,4 +1082,23 @@ class MonitorRestApiIT : AlertingRestTestCase() {
assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus())
}
}

private fun validateAlertingStatsNodeResponse(nodesResponse: Map<String, Int>) {
assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"])
assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"])
assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"])
}

private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map<String, Any>): Boolean {
val nodesInfo = alertingStatsResponse["nodes"] as Map<String, Any>
for (nodeId in nodesInfo.keys) {
val nodeInfo = nodesInfo[nodeId] as Map<String, Any>
val jobsInfo = nodeInfo["jobs_info"] as Map<String, Any>
if (jobsInfo.keys.contains(monitorId)) {
return true
}
}

return false
}
}
42 changes: 39 additions & 3 deletions core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,13 @@ class JobSweeper(
// cancel existing background thread if present
scheduledFullSweep?.cancel()

// Manually sweep all shards before scheduling the background sweep so it picks up any changes immediately
// since the first run of a task submitted with scheduleWithFixedDelay() happens after the interval has passed.
logger.debug("Performing sweep of scheduled jobs.")
fullSweepExecutor.submit {
sweepAllShards()
}

// Setup an anti-entropy/self-healing background sweep, in case a sweep that was triggered by an event fails.
val scheduledSweep = Runnable {
val elapsedTime = getFullSweepElapsedTime()
Expand Down Expand Up @@ -372,13 +379,19 @@ class JobSweeper(
sweptJobs.getOrPut(shardId) { ConcurrentHashMap() }
// Use [compute] to update atomically in case another thread concurrently indexes/deletes the same job
.compute(jobId) { _, currentVersion ->
val jobCurrentlyScheduled = scheduler.scheduledJobs().contains(jobId)

if (newVersion <= (currentVersion ?: Versions.NOT_FOUND)) {
logger.debug("Skipping job $jobId, $newVersion <= $currentVersion")
return@compute currentVersion
if (unchangedJobToBeRescheduled(newVersion, currentVersion, jobCurrentlyScheduled, job)) {
logger.debug("Not skipping job $jobId since it is an unchanged job slated to be rescheduled")
} else {
logger.debug("Skipping job $jobId, $newVersion <= $currentVersion")
return@compute currentVersion
}
}

// deschedule the currently scheduled version
if (scheduler.scheduledJobs().contains(jobId)) {
if (jobCurrentlyScheduled) {
scheduler.deschedule(jobId)
}

Expand All @@ -396,6 +409,29 @@ class JobSweeper(
}
}

/*
* During the job sweep, normally jobs where the currentVersion is equal to the newVersion are skipped since
* there was no change.
*
* However, there exists an edge-case where a job could have been de-scheduled by flipping [SWEEPER_ENABLED]
* to false and then not have undergone any changes when the sweeper is re-enabled. In this case, the job should
* not be skipped so it can be re-scheduled. This utility method checks for this condition so the sweep() method
* can account for it.
*/
private fun unchangedJobToBeRescheduled(
newVersion: JobVersion,
currentVersion: JobVersion?,
jobCurrentlyScheduled: Boolean,
job: ScheduledJob?
): Boolean {
// newVersion should not be [Versions.NOT_FOUND] here since it's passed in from existing search hits
// or successful doc delete operations
val versionWasUnchanged = newVersion == (currentVersion ?: Versions.NOT_FOUND)
val jobEnabled = job?.enabled ?: false

return versionWasUnchanged && !jobCurrentlyScheduled && jobEnabled
}

private fun parseAndSweepJob(
xcp: XContentParser,
shardId: ShardId,
Expand Down

0 comments on commit ea7d526

Please sign in to comment.