Skip to content

Commit

Permalink
adds tests and logs failure or success of cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed May 8, 2023
1 parent 8928a8c commit 6c11123
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
Expand All @@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.opensearchapi.suspendUntil
Expand Down Expand Up @@ -57,6 +57,7 @@ import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

class TransportDeleteMonitorAction @Inject constructor(
Expand Down Expand Up @@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor(
if (!validateUserBackendRoles(user, actionListener)) {
return
}

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
scope.launch {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
}
}
Expand Down Expand Up @@ -146,7 +146,7 @@ class TransportDeleteMonitorAction @Inject constructor(
}

private suspend fun deleteMetadata(monitor: Monitor) {
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand Down Expand Up @@ -178,7 +179,7 @@ class TransportIndexMonitorAction @Inject constructor(
client: Client,
actionListener: ActionListener<IndexMonitorResponse>,
request: IndexMonitorRequest,
user: User?
user: User?,
) {
val indices = mutableListOf<String>()
// todo: for doc level alerting: check if index is present before monitor is created.
Expand Down Expand Up @@ -231,7 +232,7 @@ class TransportIndexMonitorAction @Inject constructor(
client: Client,
actionListener: ActionListener<IndexMonitorResponse>,
request: IndexMonitorRequest,
user: User?
user: User?,
) {
client.threadPool().threadContext.stashContext().use {
IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD()
Expand All @@ -242,7 +243,7 @@ class TransportIndexMonitorAction @Inject constructor(
private val client: Client,
private val actionListener: ActionListener<IndexMonitorResponse>,
private val request: IndexMonitorRequest,
private val user: User?
private val user: User?,
) {

fun resolveUserAndStart() {
Expand Down Expand Up @@ -503,10 +504,7 @@ class TransportIndexMonitorAction @Inject constructor(
metadata = monitorMetadata
} catch (t: Exception) {
log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor")
client.execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
DeleteMonitorRequest(indexResponse.id, RefreshPolicy.IMMEDIATE)
)
cleanupMonitorAfterPartialFailure(indexResponse)
throw t
}
try {
Expand All @@ -517,10 +515,7 @@ class TransportIndexMonitorAction @Inject constructor(
MonitorMetadataService.upsertMetadata(metadata, updating = true)
} catch (t: Exception) {
log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t)
client.execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
DeleteMonitorRequest(indexResponse.id, RefreshPolicy.IMMEDIATE)
)
cleanupMonitorAfterPartialFailure(indexResponse)
throw t
}

Expand All @@ -535,6 +530,22 @@ class TransportIndexMonitorAction @Inject constructor(
}
}

private suspend fun cleanupMonitorAfterPartialFailure(indexMonitorResponse: IndexResponse) {
// we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request
try {
val dmr: DeleteMonitorResponse = client.suspendUntil {
client.execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE,
DeleteMonitorRequest(indexMonitorResponse.id, RefreshPolicy.IMMEDIATE),
it
)
}
log.debug("Cleaned up monitor after monitor creation request partial failure. Monitor id : ${dmr.id}")
} catch (e: Exception) {
log.error("Failed to clean up monitor after monitor creation request partial failure", e)
}
}

@Suppress("UNCHECKED_CAST")
private suspend fun indexDocLevelMonitorQueries(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.alias.Alias
import org.opensearch.action.admin.indices.close.CloseIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest
Expand All @@ -23,7 +24,6 @@ import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.DocLevelMonitorQueries.Companion.INDEX_PATTERN_SUFFIX
Expand Down Expand Up @@ -755,12 +755,43 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size)
}

fun `test execute monitor without create when no monitors exists`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
fun `test cleanup monitor on partial create monitor failure`() {
val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customQueryIndex = "custom_alerts_index"
val analyzer = "whitespace"
val analyzer = "dfbdfbafd"
val testDoc = """{
"rule": {"title": "some_title"},
"message": "msg 1 2 3 4"
}"""
indexDoc(index, "2", testDoc)
client().admin().indices()
.create(
CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex))
.mapping(
"""
{
"_meta": {
"schema_version": 1
},
"properties": {
"query": {
"type": "percolator_ext"
},
"monitor_id": {
"type": "text"
},
"index": {
"type": "text"
}
}
}
""".trimIndent()
)
)

client().admin().indices().close(CloseIndexRequest(customQueryIndex))
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
Expand All @@ -769,31 +800,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
var executeMonitorResponse = executeMonitor(monitor, null)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

assertIndexNotExists(SCHEDULED_JOBS_INDEX)

val createMonitorResponse = createMonitor(monitor)

assertIndexExists(SCHEDULED_JOBS_INDEX)

indexDoc(index, "1", testDoc)

executeMonitorResponse = executeMonitor(monitor, createMonitorResponse?.id, dryRun = false)

Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
Assert.assertEquals(
(executeMonitorResponse.monitorRunResult.triggerResults.iterator().next().value as DocumentLevelTriggerRunResult)
.triggeredDocs.size,
1
)
try {
createMonitor(monitor)
fail("monitor creation should fail due to incorrect analyzer name in test setup")
} catch (e: Exception) {
Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0)
}
}

fun `test execute monitor with custom query index and custom field mappings`() {
Expand Down

0 comments on commit 6c11123

Please sign in to comment.