diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 6242daaab..b96ef8119 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -5,9 +5,8 @@ package org.opensearch.alerting.transport -import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException @@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.opensearchapi.suspendUntil @@ -57,6 +57,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) class TransportDeleteMonitorAction @Inject constructor( @@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor( if (!validateUserBackendRoles(user, actionListener)) { return } - - GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) { + scope.launch { DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart() } } @@ -109,9 +109,7 @@ class TransportDeleteMonitorAction @Inject constructor( checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) if (canDelete) { - val deleteResponse = deleteMonitor(monitor) - deleteDocLevelMonitorQueriesAndIndices(monitor) - deleteMetadata(monitor) + val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId) actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)) } else { actionListener.onFailure( @@ -119,6 +117,7 @@ class TransportDeleteMonitorAction @Inject constructor( ) } } catch (t: Exception) { + log.error("Failed to delete monitor ${deleteRequest.id()}", t) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor( ) return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } + } + + companion object { + @JvmStatic + suspend fun deleteAllResourcesForMonitor( + client: Client, + monitor: Monitor, + deleteRequest: DeleteRequest, + monitorId: String, + ): DeleteResponse { + val deleteResponse = deleteMonitorDocument(client, deleteRequest) + deleteMetadata(client, monitor) + deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId) + return deleteResponse + } - private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse { + private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse { return client.suspendUntil { delete(deleteRequest, it) } } - private suspend fun deleteMetadata(monitor: Monitor) { + suspend fun deleteMetadata(client: Client, monitor: Monitor) { val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") - val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + try { + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}") + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted, + // we cannot retry based on this failure + log.error("Failed to delete monitor metadata ${deleteRequest.id()}.", e) + } } - private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { - val clusterState = clusterService.state() - val metadata = MonitorMetadataService.getMetadata(monitor) - metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - - val indicesExistsResponse: IndicesExistsResponse = - client.suspendUntil { - client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + suspend fun deleteDocLevelMonitorQueriesAndIndices( + client: Client, + monitor: Monitor, + monitorId: String, + ) { + try { + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return } - if (indicesExistsResponse.isExists == false) { - return - } - // Check if there's any queries from other monitors in this queryIndex, - // to avoid unnecessary doc deletion, if we could just delete index completely - val searchResponse: SearchResponse = client.suspendUntil { - search( - SearchRequest(queryIndex).source( - SearchSourceBuilder() - .size(0) - .query( - QueryBuilders.boolQuery().mustNot( - QueryBuilders.matchQuery("monitor_id", monitorId) + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitorId) + ) ) - ) - ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), - it - ) - } - if (searchResponse.hits.totalHits.value == 0L) { - val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete( - DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it ) } - if (ack.isAcknowledged == false) { - log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") - } - } else { - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) { + cont.resumeWithException(t) + } + } + ) + } } } + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted successfully, + // we cannot retry based on this failure + log.error("Failed to delete doc level queries from query index.", e) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 1a759b5fa..fb34f56c6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -20,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -177,7 +178,7 @@ class TransportIndexMonitorAction @Inject constructor( client: Client, actionListener: ActionListener, request: IndexMonitorRequest, - user: User? + user: User?, ) { val indices = mutableListOf() // todo: for doc level alerting: check if index is present before monitor is created. @@ -230,7 +231,7 @@ class TransportIndexMonitorAction @Inject constructor( client: Client, actionListener: ActionListener, request: IndexMonitorRequest, - user: User? + user: User?, ) { client.threadPool().threadContext.stashContext().use { IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD() @@ -241,7 +242,7 @@ class TransportIndexMonitorAction @Inject constructor( private val client: Client, private val actionListener: ActionListener, private val request: IndexMonitorRequest, - private val user: User? + private val user: User?, ) { fun resolveUserAndStart() { @@ -492,16 +493,30 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - request.monitor = request.monitor.copy(id = indexResponse.id) - var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) - if (created == false) { - log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!") + var metadata: MonitorMetadata? + try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener + request.monitor = request.monitor.copy(id = indexResponse.id) + var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor) + if (created == false) { + log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") + } + metadata = monitorMetadata + } catch (t: Exception) { + log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor") + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) + throw t } - if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + try { + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(metadata, updating = true) + } catch (t: Exception) { + log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t) + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) + throw t } - // When inserting queries in queryIndex we could update sourceToQueryIndexMapping - MonitorMetadataService.upsertMetadata(metadata, updating = true) actionListener.onResponse( IndexMonitorResponse( @@ -514,6 +529,24 @@ class TransportIndexMonitorAction @Inject constructor( } } + private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) { + // we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request + try { + TransportDeleteMonitorAction.deleteAllResourcesForMonitor( + client, + monitor = monitor, + DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE), + indexMonitorResponse.id + ) + log.debug( + "Cleaned up monitor related resources after monitor creation request partial failure. " + + "Monitor id : ${indexMonitorResponse.id}" + ) + } catch (e: Exception) { + log.error("Failed to clean up monitor after monitor creation request partial failure", e) + } + } + @Suppress("UNCHECKED_CAST") private suspend fun indexDocLevelMonitorQueries( monitor: Monitor, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index e647b31eb..ba138617a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.close.CloseIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest @@ -907,6 +908,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size) } + fun `test cleanup monitor on partial create monitor failure`() { + val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customQueryIndex = "custom_alerts_index" + val analyzer = "dfbdfbafd" + val testDoc = """{ + "rule": {"title": "some_title"}, + "message": "msg 1 2 3 4" + }""" + indexDoc(index, "2", testDoc) + client().admin().indices() + .create( + CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex)) + .mapping( + """ + { + "_meta": { + "schema_version": 1 + }, + "properties": { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + }, + "index": { + "type": "text" + } + } + } + """.trimIndent() + ) + ).get() + + client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get() + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), + ) + ) + try { + createMonitor(monitor) + fail("monitor creation should fail due to incorrect analyzer name in test setup") + } catch (e: Exception) { + Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0) + } + } + fun `test execute monitor without create when no monitors exists`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt index edb0dab3a..fd51c3e59 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt @@ -39,17 +39,20 @@ import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomAlert import org.opensearch.alerting.randomBucketLevelMonitor import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.alerting.randomDocumentLevelMonitor import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomTemplateScript import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.authuser.User import org.opensearch.commons.rest.SecureRestClientBuilder @@ -1479,4 +1482,66 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { deleteRoleAndRoleMapping(TEST_HR_ROLE) } } + + /** + * We want to verify that user roles/permissions do not affect clean up of monitors during partial monitor creation failure + */ + fun `test create monitor failure clean up with a user without delete monitor access`() { + enableFilterBy() + createUser(user, user, listOf(TEST_HR_BACKEND_ROLE, "role2").toTypedArray()) + createTestIndex(TEST_HR_INDEX) + createCustomIndexRole( + ALERTING_INDEX_MONITOR_ACCESS, + TEST_HR_INDEX, + getClusterPermissionsFromCustomRole(ALERTING_INDEX_MONITOR_ACCESS) + ) + createUserWithRoles( + user, + listOf(ALERTING_INDEX_MONITOR_ACCESS, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + val docLevelQueryIndex = ".opensearch-alerting-queries-000001" + createIndex( + docLevelQueryIndex, Settings.EMPTY, + """ + "properties" : { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + }, + "index": { + "type": "text" + } + } + } + """.trimIndent(), + ".opensearch-alerting-queries" + ) + closeIndex(docLevelQueryIndex) // close index to simulate doc level query indexing failure + try { + val monitor = randomDocumentLevelMonitor( + withMetadata = false, + triggers = listOf(), + inputs = listOf(DocLevelMonitorInput("description", listOf(TEST_HR_INDEX), emptyList())) + ) + userClient?.makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) + fail("Monitor creation should have failed due to error in indexing doc level queries") + } catch (e: ResponseException) { + val search = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(10).toString() + val searchResponse = client().makeRequest( + "GET", "$ALERTING_BASE_URI/_search", + emptyMap(), + StringEntity(search, ContentType.APPLICATION_JSON) + ) + val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) + val hits = xcp.map()["hits"]!! as Map> + val numberDocsFound = hits["total"]?.get("value") + assertEquals("Monitors found. Clean up unsuccessful", 0, numberDocsFound) + } finally { + deleteRoleAndRoleMapping(ALERTING_INDEX_MONITOR_ACCESS) + } + } }