From f754929bf297aec36968d526a4bb7a6999ca4779 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 3 May 2023 13:54:38 -0700 Subject: [PATCH] log errors and clean up monitor when indexing doc level queries or metadata creation fails Signed-off-by: Surya Sashank Nistala --- .../transport/TransportDeleteMonitorAction.kt | 145 +++++++++++------- .../transport/TransportIndexMonitorAction.kt | 60 ++++++-- .../alerting/MonitorDataSourcesIT.kt | 54 +++++++ .../resthandler/SecureMonitorRestApiIT.kt | 125 ++++++++++----- 4 files changed, 278 insertions(+), 106 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 6242daaab..f97f56c05 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -5,9 +5,8 @@ package org.opensearch.alerting.transport -import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException @@ -25,6 +24,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.opensearchapi.suspendUntil @@ -35,6 +35,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AlertingActions @@ -44,7 +45,6 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.index.query.QueryBuilders import org.opensearch.index.reindex.BulkByScrollResponse import org.opensearch.index.reindex.DeleteByQueryAction @@ -57,6 +57,7 @@ import kotlin.coroutines.resume import kotlin.coroutines.resumeWithException import kotlin.coroutines.suspendCoroutine +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) class TransportDeleteMonitorAction @Inject constructor( @@ -87,8 +88,7 @@ class TransportDeleteMonitorAction @Inject constructor( if (!validateUserBackendRoles(user, actionListener)) { return } - - GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) { + scope.launch { DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart() } } @@ -109,9 +109,7 @@ class TransportDeleteMonitorAction @Inject constructor( checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) if (canDelete) { - val deleteResponse = deleteMonitor(monitor) - deleteDocLevelMonitorQueriesAndIndices(monitor) - deleteMetadata(monitor) + val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId) actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)) } else { actionListener.onFailure( @@ -119,6 +117,7 @@ class TransportDeleteMonitorAction @Inject constructor( ) } } catch (t: Exception) { + log.error("Failed to delete monitor ${deleteRequest.id()}", t) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -140,68 +139,102 @@ class TransportDeleteMonitorAction @Inject constructor( ) return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } + } + + companion object { + @JvmStatic + suspend fun deleteAllResourcesForMonitor( + client: Client, + monitor: Monitor, + deleteRequest: DeleteRequest, + monitorId: String, + ): DeleteResponse { + val deleteResponse = deleteMonitorDocument(client, deleteRequest) + deleteMetadata(client, monitor) + deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId) + return deleteResponse + } - private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse { + private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse { return client.suspendUntil { delete(deleteRequest, it) } } - private suspend fun deleteMetadata(monitor: Monitor) { + suspend fun deleteMetadata(client: Client, monitor: Monitor) { val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") - val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + try { + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}") + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted, + // we cannot retry based on this failure + log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e) + } } - private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { - val clusterState = clusterService.state() - val metadata = MonitorMetadataService.getMetadata(monitor) - metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - - val indicesExistsResponse: IndicesExistsResponse = - client.suspendUntil { - client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + suspend fun deleteDocLevelMonitorQueriesAndIndices( + client: Client, + monitor: Monitor, + monitorId: String, + ) { + try { + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return } - if (indicesExistsResponse.isExists == false) { - return - } - // Check if there's any queries from other monitors in this queryIndex, - // to avoid unnecessary doc deletion, if we could just delete index completely - val searchResponse: SearchResponse = client.suspendUntil { - search( - SearchRequest(queryIndex).source( - SearchSourceBuilder() - .size(0) - .query( - QueryBuilders.boolQuery().mustNot( - QueryBuilders.matchQuery("monitor_id", monitorId) + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitorId) + ) ) - ) - ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), - it - ) - } - if (searchResponse.hits.totalHits.value == 0L) { - val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete( - DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it ) } - if (ack.isAcknowledged == false) { - log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") - } - } else { - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) { + cont.resumeWithException(t) + } + } + ) + } } } + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted successfully, + // we cannot retry based on this failure + log.error("Failed to delete workflow metadata for monitor ${monitor.id}.", e) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 1a759b5fa..310def8fe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -20,6 +20,7 @@ import org.opensearch.action.admin.cluster.health.ClusterHealthAction import org.opensearch.action.admin.cluster.health.ClusterHealthRequest import org.opensearch.action.admin.cluster.health.ClusterHealthResponse import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -45,7 +46,6 @@ import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.isADMonitor -import org.opensearch.alerting.util.use import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -53,6 +53,8 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType @@ -67,8 +69,6 @@ import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JO import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject -import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.core.xcontent.ToXContent import org.opensearch.index.query.QueryBuilders import org.opensearch.index.reindex.BulkByScrollResponse import org.opensearch.index.reindex.DeleteByQueryAction @@ -177,7 +177,7 @@ class TransportIndexMonitorAction @Inject constructor( client: Client, actionListener: ActionListener, request: IndexMonitorRequest, - user: User? + user: User?, ) { val indices = mutableListOf() // todo: for doc level alerting: check if index is present before monitor is created. @@ -230,7 +230,7 @@ class TransportIndexMonitorAction @Inject constructor( client: Client, actionListener: ActionListener, request: IndexMonitorRequest, - user: User? + user: User?, ) { client.threadPool().threadContext.stashContext().use { IndexMonitorHandler(client, actionListener, request, user).resolveUserAndStartForAD() @@ -241,7 +241,7 @@ class TransportIndexMonitorAction @Inject constructor( private val client: Client, private val actionListener: ActionListener, private val request: IndexMonitorRequest, - private val user: User? + private val user: User?, ) { fun resolveUserAndStart() { @@ -492,16 +492,30 @@ class TransportIndexMonitorAction @Inject constructor( ) return } - request.monitor = request.monitor.copy(id = indexResponse.id) - var (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) - if (created == false) { - log.warn("Metadata doc id:${metadata.id} exists, but it shouldn't!") + var metadata: MonitorMetadata? + try { // delete monitor if metadata creation fails, log the right error and re-throw the error to fail listener + request.monitor = request.monitor.copy(id = indexResponse.id) + var (monitorMetadata: MonitorMetadata, created: Boolean) = MonitorMetadataService.getOrCreateMetadata(request.monitor) + if (created == false) { + log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") + } + metadata = monitorMetadata + } catch (t: Exception) { + log.error("failed to create metadata for monitor ${indexResponse.id}. deleting monitor") + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) + throw t } - if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + try { + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(metadata, updating = true) + } catch (t: Exception) { + log.error("failed to index doc level queries monitor ${indexResponse.id}. deleting monitor", t) + cleanupMonitorAfterPartialFailure(request.monitor, indexResponse) + throw t } - // When inserting queries in queryIndex we could update sourceToQueryIndexMapping - MonitorMetadataService.upsertMetadata(metadata, updating = true) actionListener.onResponse( IndexMonitorResponse( @@ -514,6 +528,24 @@ class TransportIndexMonitorAction @Inject constructor( } } + private suspend fun cleanupMonitorAfterPartialFailure(monitor: Monitor, indexMonitorResponse: IndexResponse) { + // we simply log the success (debug log) or failure (error log) when we try clean up partially failed monitor creation request + try { + TransportDeleteMonitorAction.deleteAllResourcesForMonitor( + client, + monitor = monitor, + DeleteRequest(SCHEDULED_JOBS_INDEX, indexMonitorResponse.id).setRefreshPolicy(RefreshPolicy.IMMEDIATE), + indexMonitorResponse.id + ) + log.debug( + "Cleaned up monitor related resources after monitor creation request partial failure. " + + "Monitor id : ${indexMonitorResponse.id}" + ) + } catch (e: Exception) { + log.error("Failed to clean up monitor after monitor creation request partial failure", e) + } + } + @Suppress("UNCHECKED_CAST") private suspend fun indexDocLevelMonitorQueries( monitor: Monitor, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 5d802f074..46b0768da 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.close.CloseIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest @@ -755,6 +756,59 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match all 4 queries", 1, findings[0].docLevelQueries.size) } + fun `test cleanup monitor on partial create monitor failure`() { + val docQuery = DocLevelQuery(query = "dnbkjndsfkjbnds:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customQueryIndex = "custom_alerts_index" + val analyzer = "dfbdfbafd" + val testDoc = """{ + "rule": {"title": "some_title"}, + "message": "msg 1 2 3 4" + }""" + indexDoc(index, "2", testDoc) + client().admin().indices() + .create( + CreateIndexRequest(customQueryIndex + "-000001").alias(Alias(customQueryIndex)) + .mapping( + """ + { + "_meta": { + "schema_version": 1 + }, + "properties": { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + }, + "index": { + "type": "text" + } + } + } + """.trimIndent() + ) + ).get() + + client().admin().indices().close(CloseIndexRequest(customQueryIndex + "-000001")).get() + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), + ) + ) + try { + createMonitor(monitor) + fail("monitor creation should fail due to incorrect analyzer name in test setup") + } catch (e: Exception) { + Assert.assertEquals(client().search(SearchRequest(SCHEDULED_JOBS_INDEX)).get().hits.hits.size, 0) + } + } + fun `test execute monitor without create when no monitors exists`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt index edb0dab3a..6751c5162 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureMonitorRestApiIT.kt @@ -5,10 +5,11 @@ package org.opensearch.alerting.resthandler -import org.apache.hc.core5.http.ContentType -import org.apache.hc.core5.http.HttpHeaders -import org.apache.hc.core5.http.io.entity.StringEntity -import org.apache.hc.core5.http.message.BasicHeader +import org.apache.http.HttpHeaders +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.apache.http.message.BasicHeader +import org.apache.http.nio.entity.NStringEntity import org.junit.After import org.junit.Before import org.junit.BeforeClass @@ -39,21 +40,24 @@ import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomAlert import org.opensearch.alerting.randomBucketLevelMonitor import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.alerting.randomDocumentLevelMonitor import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomTemplateScript import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.client.RestClient +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.authuser.User import org.opensearch.commons.rest.SecureRestClientBuilder -import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.index.query.QueryBuilders import org.opensearch.rest.RestStatus import org.opensearch.script.Script @@ -83,10 +87,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { if (userClient == null) { createUser(user, user, arrayOf()) - userClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), user, user) - .setSocketTimeout(60000) - .setConnectionRequestTimeout(180000) - .build() + userClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), user, user).setSocketTimeout(60000).build() } } @@ -192,7 +193,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { val searchResponse = client().makeRequest( "GET", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, searchResponse.restStatus()) @@ -396,9 +397,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { getClusterPermissionsFromCustomRole(ALERTING_GET_MONITOR_ACCESS) ) val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) - .setSocketTimeout(60000) - .setConnectionRequestTimeout(180000) - .build() + .setSocketTimeout(60000).build() val getMonitorResponse = getUserClient?.makeRequest( "GET", @@ -591,9 +590,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { getClusterPermissionsFromCustomRole(ALERTING_GET_MONITOR_ACCESS) ) val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) - .setSocketTimeout(60000) - .setConnectionRequestTimeout(180000) - .build() + .setSocketTimeout(60000).build() val getMonitorResponse = getUserClient?.makeRequest( "GET", @@ -729,9 +726,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { getClusterPermissionsFromCustomRole(ALERTING_GET_MONITOR_ACCESS) ) val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) - .setSocketTimeout(60000) - .setConnectionRequestTimeout(180000) - .build() + .setSocketTimeout(60000).build() val getMonitorResponse = getUserClient?.makeRequest( "GET", @@ -784,9 +779,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { ) val updateUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), updateUser, updateUser) - .setSocketTimeout(60000) - .setConnectionRequestTimeout(180000) - .build() + .setSocketTimeout(60000).build() val updatedMonitor = updateMonitorWithClient(updateUserClient, createdMonitor, listOf("role5")) // old user should no longer have access @@ -837,9 +830,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { getClusterPermissionsFromCustomRole(ALERTING_GET_MONITOR_ACCESS) ) val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) - .setSocketTimeout(60000) - .setConnectionRequestTimeout(180000) - .build() + .setSocketTimeout(60000).build() val getMonitorResponse = getUserClient?.makeRequest( "GET", @@ -903,7 +894,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) @@ -937,7 +928,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) @@ -964,7 +955,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) assertEquals("Monitor not found during search", 1, getDocs(adminSearchResponse)) @@ -974,7 +965,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { userClient?.makeRequest( "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) fail("Expected 403 FORBIDDEN response") } catch (e: ResponseException) { @@ -993,7 +984,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, userOneSearchResponse?.restStatus()) assertEquals("Monitor not found during search", 1, getDocs(userOneSearchResponse)) @@ -1015,7 +1006,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) assertEquals("Monitor not found during search", 1, getDocs(adminSearchResponse)) @@ -1025,7 +1016,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { userClient?.makeRequest( "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) fail("Expected 403 FORBIDDEN response") } catch (e: ResponseException) { @@ -1039,7 +1030,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, userOneSearchResponse?.restStatus()) assertEquals("Monitor not found during search", 0, getDocs(userOneSearchResponse)) @@ -1155,6 +1146,68 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { } } + /** + * We want to verify that user roles/permissions do not affect clean up of monitors during partial monitor creation failure + */ + fun `test create monitor failure clean up with a user without delete monitor access`() { + enableFilterBy() + createUser(user, user, listOf(TEST_HR_BACKEND_ROLE, "role2").toTypedArray()) + createTestIndex(TEST_HR_INDEX) + createCustomIndexRole( + ALERTING_INDEX_MONITOR_ACCESS, + TEST_HR_INDEX, + getClusterPermissionsFromCustomRole(ALERTING_INDEX_MONITOR_ACCESS) + ) + createUserWithRoles( + user, + listOf(ALERTING_INDEX_MONITOR_ACCESS, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + val docLevelQueryIndex = ".opensearch-alerting-queries-000001" + createIndex( + docLevelQueryIndex, Settings.EMPTY, + """ + "properties" : { + "query": { + "type": "percolator_ext" + }, + "monitor_id": { + "type": "text" + }, + "index": { + "type": "text" + } + } + } + """.trimIndent(), + ".opensearch-alerting-queries" + ) + closeIndex(docLevelQueryIndex) // close index to simulate doc level query indexing failure + try { + val monitor = randomDocumentLevelMonitor( + withMetadata = false, + triggers = listOf(), + inputs = listOf(DocLevelMonitorInput("description", listOf(TEST_HR_INDEX), emptyList())) + ) + userClient?.makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) + fail("Monitor creation should have failed due to error in indexing doc level queries") + } catch (e: ResponseException) { + val search = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()).size(10).toString() + val searchResponse = client().makeRequest( + "GET", "$ALERTING_BASE_URI/_search", + emptyMap(), + StringEntity(search, ContentType.APPLICATION_JSON) + ) + val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) + val hits = xcp.map()["hits"]!! as Map> + val numberDocsFound = hits["total"]?.get("value") + assertEquals("Monitors found. Clean up unsuccessful", 0, numberDocsFound) + } finally { + deleteRoleAndRoleMapping(ALERTING_INDEX_MONITOR_ACCESS) + } + } + fun `test query all alerts in all states with disabled filter by`() { disableFilterBy() @@ -1322,7 +1375,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "POST", "$ALERTING_BASE_URI/_search", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) assertEquals("Monitor not found during search", 1, getDocs(adminSearchResponse)) @@ -1333,7 +1386,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "GET", "$ALERTING_BASE_URI/$id", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Get monitor failed", RestStatus.OK, adminGetResponse.restStatus()) @@ -1342,7 +1395,7 @@ class SecureMonitorRestApiIT : AlertingRestTestCase() { "DELETE", "$ALERTING_BASE_URI/$id", emptyMap(), - StringEntity(search, ContentType.APPLICATION_JSON) + NStringEntity(search, ContentType.APPLICATION_JSON) ) assertEquals("Delete monitor failed", RestStatus.OK, adminDeleteResponse.restStatus()) } finally {