diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index b48db925e..77a0e5656 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -421,6 +421,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return getMonitor(monitorId = monitorId) } + @Suppress("UNCHECKED_CAST") protected fun updateMonitor(monitor: Monitor, refresh: Boolean = false): Monitor { val response = client().makeRequest( "PUT", "${monitor.relativeUrl()}?refresh=$refresh", diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index 25ac5b4a4..6e607a186 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -271,7 +271,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { } } - /* Enable this test case after issue issue#269 is fixed. + /* Enable this test case after checking for disallowed destination during Monitor creation is added in fun `test creating a monitor with a disallowed destination type fails`() { try { // Create a Chime Destination @@ -284,7 +284,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { chime = chime, slack = null, customWebhook = null, - email = null) + email = null + ) val chimeDestination = createDestination(destination = destination) // Remove Chime from the allow_list @@ -293,12 +294,13 @@ class MonitorRestApiIT : AlertingRestTestCase() { .joinToString(prefix = "[", postfix = "]") { string -> "\"$string\"" } client().updateSettings(DestinationSettings.ALLOW_LIST.key, allowedDestinations) - createMonitor(randomMonitor(triggers = listOf(randomTrigger(destinationId = chimeDestination.id)))) + createMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(destinationId = chimeDestination.id)))) fail("Expected 403 Method FORBIDDEN response") } catch (e: ResponseException) { assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) } - }*/ + } + */ @Throws(Exception::class) fun `test updating search for a monitor`() { @@ -887,13 +889,54 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Scheduled job is not enabled", false, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key]) assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) + } + + fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() { + // Enable Monitor jobs + enableScheduledJob() + val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id + + var alertingStats = getAlertingStats() + assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key]) + assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"]) + assertEquals("Scheduled job index is not yellow", "yellow", alertingStats["scheduled_job_index_status"]) + assertEquals("Nodes are not on schedule", numberOfNodes, alertingStats["nodes_on_schedule"]) + + val _nodes = alertingStats["_nodes"] as Map + validateAlertingStatsNodeResponse(_nodes) + + assertTrue( + "Monitor [$monitorId] was not found scheduled based on the alerting stats response: $alertingStats", + isMonitorScheduled(monitorId, alertingStats) + ) + + // Disable Monitor jobs + disableScheduledJob() + + alertingStats = getAlertingStats() + assertEquals("Scheduled job is still enabled", false, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key]) + assertFalse( + "Monitor [$monitorId] was still scheduled based on the alerting stats response: $alertingStats", + isMonitorScheduled(monitorId, alertingStats) + ) + + // Re-enable Monitor jobs + enableScheduledJob() + + // Sleep briefly so sweep can reschedule the Monitor + Thread.sleep(2000) + + alertingStats = getAlertingStats() + assertEquals("Scheduled job is not enabled", true, alertingStats[ScheduledJobSettings.SWEEPER_ENABLED.key]) + assertTrue( + "Monitor [$monitorId] was not re-scheduled based on the alerting stats response: $alertingStats", + isMonitorScheduled(monitorId, alertingStats) + ) } fun `test monitor stats no jobs`() { - // Disable the Monitor plugin. + // Enable the Monitor plugin. enableScheduledJob() val responseMap = getAlertingStats() @@ -901,9 +944,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Scheduled job is not enabled", true, responseMap[ScheduledJobSettings.SWEEPER_ENABLED.key]) assertEquals("Scheduled job index exists but there are no scheduled jobs.", false, responseMap["scheduled_job_index_exists"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) } fun `test monitor stats jobs`() { @@ -919,9 +960,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Nodes are not on schedule", numberOfNodes, responseMap["nodes_on_schedule"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) } @Throws(Exception::class) @@ -950,9 +989,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Nodes not on schedule", numberOfNodes, responseMap["nodes_on_schedule"]) val _nodes = responseMap["_nodes"] as Map - assertEquals("Incorrect number of nodes", numberOfNodes, _nodes["total"]) - assertEquals("Failed nodes found during monitor stats call", 0, _nodes["failed"]) - assertEquals("More than $numberOfNodes successful node", numberOfNodes, _nodes["successful"]) + validateAlertingStatsNodeResponse(_nodes) } fun `test monitor stats incorrect metric`() { @@ -1045,4 +1082,23 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) } } + + private fun validateAlertingStatsNodeResponse(nodesResponse: Map) { + assertEquals("Incorrect number of nodes", numberOfNodes, nodesResponse["total"]) + assertEquals("Failed nodes found during monitor stats call", 0, nodesResponse["failed"]) + assertEquals("More than $numberOfNodes successful node", numberOfNodes, nodesResponse["successful"]) + } + + private fun isMonitorScheduled(monitorId: String, alertingStatsResponse: Map): Boolean { + val nodesInfo = alertingStatsResponse["nodes"] as Map + for (nodeId in nodesInfo.keys) { + val nodeInfo = nodesInfo[nodeId] as Map + val jobsInfo = nodeInfo["jobs_info"] as Map + if (jobsInfo.keys.contains(monitorId)) { + return true + } + } + + return false + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt index b88dad8be..c2441bf68 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt @@ -253,6 +253,13 @@ class JobSweeper( // cancel existing background thread if present scheduledFullSweep?.cancel() + // Manually sweep all shards before scheduling the background sweep so it picks up any changes immediately + // since the first run of a task submitted with scheduleWithFixedDelay() happens after the interval has passed. + logger.debug("Performing sweep of scheduled jobs.") + fullSweepExecutor.submit { + sweepAllShards() + } + // Setup an anti-entropy/self-healing background sweep, in case a sweep that was triggered by an event fails. val scheduledSweep = Runnable { val elapsedTime = getFullSweepElapsedTime() @@ -372,13 +379,19 @@ class JobSweeper( sweptJobs.getOrPut(shardId) { ConcurrentHashMap() } // Use [compute] to update atomically in case another thread concurrently indexes/deletes the same job .compute(jobId) { _, currentVersion -> + val jobCurrentlyScheduled = scheduler.scheduledJobs().contains(jobId) + if (newVersion <= (currentVersion ?: Versions.NOT_FOUND)) { - logger.debug("Skipping job $jobId, $newVersion <= $currentVersion") - return@compute currentVersion + if (unchangedJobToBeRescheduled(newVersion, currentVersion, jobCurrentlyScheduled, job)) { + logger.debug("Not skipping job $jobId since it is an unchanged job slated to be rescheduled") + } else { + logger.debug("Skipping job $jobId, $newVersion <= $currentVersion") + return@compute currentVersion + } } // deschedule the currently scheduled version - if (scheduler.scheduledJobs().contains(jobId)) { + if (jobCurrentlyScheduled) { scheduler.deschedule(jobId) } @@ -396,6 +409,29 @@ class JobSweeper( } } + /* + * During the job sweep, normally jobs where the currentVersion is equal to the newVersion are skipped since + * there was no change. + * + * However, there exists an edge-case where a job could have been de-scheduled by flipping [SWEEPER_ENABLED] + * to false and then not have undergone any changes when the sweeper is re-enabled. In this case, the job should + * not be skipped so it can be re-scheduled. This utility method checks for this condition so the sweep() method + * can account for it. + */ + private fun unchangedJobToBeRescheduled( + newVersion: JobVersion, + currentVersion: JobVersion?, + jobCurrentlyScheduled: Boolean, + job: ScheduledJob? + ): Boolean { + // newVersion should not be [Versions.NOT_FOUND] here since it's passed in from existing search hits + // or successful doc delete operations + val versionWasUnchanged = newVersion == (currentVersion ?: Versions.NOT_FOUND) + val jobEnabled = job?.enabled ?: false + + return versionWasUnchanged && !jobCurrentlyScheduled && jobEnabled + } + private fun parseAndSweepJob( xcp: XContentParser, shardId: ShardId,