diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 5f6140713..785f6b900 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -13,6 +13,7 @@ import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest @@ -40,6 +41,7 @@ import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.NoOpTrigger import org.opensearch.commons.alerting.model.Trigger import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.core.xcontent.NamedXContentRegistry @@ -47,6 +49,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.index.query.QueryBuilders import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.sort.SortOrder import java.time.Instant import java.util.UUID @@ -187,6 +190,19 @@ class AlertService( ) } + fun composeMonitorErrorAlert( + id: String, + monitor: Monitor, + alertError: AlertError + ): Alert { + val currentTime = Instant.now() + return Alert( + id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime, + lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message, + schemaVersion = IndexUtils.alertIndexSchemaVersion + ) + } + fun updateActionResultsForBucketLevelAlert( currentAlert: Alert, actionResults: Map, @@ -281,6 +297,60 @@ class AlertService( } ?: listOf() } + suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) { + val errorAlertIdPrefix = "error-alert" + val newErrorAlertId = "$errorAlertIdPrefix-${monitor.id}-${UUID.randomUUID()}" + + val searchRequest = SearchRequest("${monitor.dataSources.alertsIndex}*") + .source( + SearchSourceBuilder() + .sort(Alert.START_TIME_FIELD, SortOrder.DESC) + .query( + QueryBuilders.boolQuery() + .must(QueryBuilders.queryStringQuery("${Alert.ALERT_ID_FIELD}:$errorAlertIdPrefix*")) + .must(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id)) + .must(QueryBuilders.termQuery(Alert.STATE_FIELD, Alert.State.ERROR)) + ) + ) + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + + var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage)) + + if (searchResponse.hits.totalHits.value > 0L) { + if (searchResponse.hits.totalHits.value > 1L) { + logger.warn("There are [${searchResponse.hits.totalHits.value}] error alerts for monitor [${monitor.id}]") + } + // Deserialize first/latest Alert + val hit = searchResponse.hits.hits[0] + val xcp = contentParser(hit.sourceRef) + val existingErrorAlert = Alert.parse(xcp, hit.id, hit.version) + + val currentTime = Instant.now() + alert = if (alert.errorMessage != existingErrorAlert.errorMessage) { + var newErrorHistory = existingErrorAlert.errorHistory.update( + AlertError(existingErrorAlert.startTime, existingErrorAlert.errorMessage!!) + ) + alert.copy( + id = existingErrorAlert.id, + errorHistory = newErrorHistory, + startTime = currentTime, + lastNotificationTime = currentTime + ) + } else { + existingErrorAlert.copy(startTime = Instant.now(), lastNotificationTime = currentTime) + } + } + + val alertIndexRequest = IndexRequest(monitor.dataSources.alertsIndex) + .routing(alert.monitorId) + .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) + .opType(DocWriteRequest.OpType.INDEX) + .id(alert.id) + + val indexResponse: IndexResponse = client.suspendUntil { index(alertIndexRequest, it) } + logger.debug("Monitor error Alert successfully upserted. Op result: ${indexResponse.result}") + } + suspend fun saveAlerts( dataSources: DataSources, alerts: List, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 2ff0d5075..73a7c869e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -20,6 +20,7 @@ import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext import org.opensearch.alerting.util.AlertingException @@ -192,63 +193,88 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) - } catch (e: Exception) { - logger.error("Failed to start Document-level-monitor ${monitor.name}", e) - val alertingException = AlertingException( - ExceptionsHelper.unwrapCause(e).cause?.message.toString(), - RestStatus.INTERNAL_SERVER_ERROR, - e - ) - monitorResult = monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) - } - /* - populate the map queryToDocIds with pairs of - this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser - */ - queries.forEach { - if (inputRunResults.containsKey(it.id)) { - queryToDocIds[it] = inputRunResults[it.id]!! + /* + populate the map queryToDocIds with pairs of + this fixes the issue of passing id, name, tags fields of DocLevelQuery object correctly to TriggerExpressionParser + */ + queries.forEach { + if (inputRunResults.containsKey(it.id)) { + queryToDocIds[it] = inputRunResults[it.id]!! + } } - } - val idQueryMap: Map = queries.associateBy { it.id } + val idQueryMap: Map = queries.associateBy { it.id } - val triggerResults = mutableMapOf() - // If there are no triggers defined, we still want to generate findings - if (monitor.triggers.isEmpty()) { - if (dryrun == false && monitor.id != Monitor.NO_ID) { - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) + val triggerResults = mutableMapOf() + // If there are no triggers defined, we still want to generate findings + if (monitor.triggers.isEmpty()) { + if (dryrun == false && monitor.id != Monitor.NO_ID) { + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } + createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) + } + } + } else { + monitor.triggers.forEach { + triggerResults[it.id] = runForEachDocTrigger( + monitorCtx, + monitorResult, + it as DocumentLevelTrigger, + monitor, + idQueryMap, + docsToQueries, + queryToDocIds, + dryrun + ) } } - } else { - monitor.triggers.forEach { - triggerResults[it.id] = runForEachDocTrigger( - monitorCtx, - monitorResult, - it as DocumentLevelTrigger, - monitor, - idQueryMap, - docsToQueries, - queryToDocIds, - dryrun + // Don't update monitor if this is a test monitor + if (!isTempMonitor) { + // If any error happened during trigger execution, upsert monitor error alert + val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + if (errorMessage.isNotEmpty()) { + monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage) + } + + MonitorMetadataService.upsertMetadata( + monitorMetadata.copy(lastRunContext = updatedLastRunContext), + true ) } - } - // Don't update monitor if this is a test monitor - if (!isTempMonitor) { - MonitorMetadataService.upsertMetadata( - monitorMetadata.copy(lastRunContext = updatedLastRunContext), - true + // TODO: Update the Document as part of the Trigger and return back the trigger action result + return monitorResult.copy(triggerResults = triggerResults) + } catch (e: Exception) { + val errorMessage = ExceptionsHelper.detailedMessage(e) + monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage) + logger.error("Failed running Document-level-monitor ${monitor.name}", e) + val alertingException = AlertingException( + errorMessage, + RestStatus.INTERNAL_SERVER_ERROR, + e ) + return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) } + } - // TODO: Update the Document as part of the Trigger and return back the trigger action result - return monitorResult.copy(triggerResults = triggerResults) + private fun constructErrorMessageFromTriggerResults( + triggerResults: MutableMap? = null + ): String { + var errorMessage = "" + if (triggerResults != null) { + val triggersErrorBuilder = StringBuilder() + triggerResults.forEach { + if (it.value.error != null) { + triggersErrorBuilder.append("[${it.key}]: [${it.value.error!!.userErrorMessage()}]").append(" | ") + } + } + if (triggersErrorBuilder.isNotEmpty()) { + errorMessage = "Trigger errors: $triggersErrorBuilder" + } + } + return errorMessage } private suspend fun runForEachDocTrigger( @@ -295,16 +321,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { alerts.add(alert) } - if (findingDocPairs.isEmpty() && monitorResult.error != null) { - val alert = monitorCtx.alertService!!.composeDocLevelAlert( - listOf(), - listOf(), - triggerCtx, - monitorResult.alertError() ?: triggerResult.alertError() - ) - alerts.add(alert) - } - val shouldDefaultToPerExecution = defaultToPerExecutionAction( monitorCtx.maxActionableAlertCount, monitorId = monitor.id, @@ -576,8 +592,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() { searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - val response: SearchResponse = monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it) + var response: SearchResponse + try { + response = monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it) + } + } catch (e: Exception) { + throw IllegalStateException( + "Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e + ) } if (response.status() !== RestStatus.OK) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index d8e360155..23375dc0e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -351,6 +351,7 @@ class AlertIndices( } if (existsResponse.isExists) return true + logger.debug("index: [$index] schema mappings: [$schemaMapping]") val request = CreateIndexRequest(index) .mapping(schemaMapping) .settings(Settings.builder().put("index.hidden", true).build()) @@ -474,7 +475,7 @@ class AlertIndices( clusterStateRequest, object : ActionListener { override fun onResponse(clusterStateResponse: ClusterStateResponse) { - if (!clusterStateResponse.state.metadata.indices.isEmpty()) { + if (clusterStateResponse.state.metadata.indices.isNotEmpty()) { val indicesToDelete = getIndicesToDelete(clusterStateResponse) logger.info("Deleting old $tag indices viz $indicesToDelete") deleteAllOldHistoryIndices(indicesToDelete) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index b45011aa3..af96d0ab6 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -283,6 +283,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) indexRequests.add(indexRequest) } + log.debug("bulk inserting percolate [${queries.size}] queries") if (indexRequests.isNotEmpty()) { val bulkResponse: BulkResponse = client.suspendUntil { client.bulk( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 3ef5e8be8..5d802f074 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -7,12 +7,14 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.indices.close.CloseIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexResponse import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.admin.indices.open.OpenIndexRequest import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest import org.opensearch.action.search.SearchRequest @@ -24,6 +26,7 @@ import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.transport.AlertingSingleNodeTestCase import org.opensearch.alerting.util.DocLevelMonitorQueries +import org.opensearch.alerting.util.DocLevelMonitorQueries.Companion.INDEX_PATTERN_SUFFIX import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest @@ -41,6 +44,7 @@ import org.opensearch.commons.alerting.model.Table import org.opensearch.index.mapper.MapperService import org.opensearch.index.query.MatchQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.script.Script import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime @@ -573,6 +577,109 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match all 7 queries", 7, findings[0].docLevelQueries.size) } + fun `test monitor error alert created and updated with new error`() { + val docQuery = DocLevelQuery(query = "source:12345", name = "1") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val testDoc = """{ + "message" : "This is an error from IAD region" + }""" + + val monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + + // Close index to force error alert + client().admin().indices().close(CloseIndexRequest(index)).get() + + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + var table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertTrue(getAlertsResponse.alerts[0].errorMessage == "IndexClosedException[closed]") + // Reopen index + client().admin().indices().open(OpenIndexRequest(index)).get() + // Close queryIndex + client().admin().indices().close(CloseIndexRequest(DOC_LEVEL_QUERIES_INDEX + INDEX_PATTERN_SUFFIX)).get() + + indexDoc(index, "1", testDoc) + + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + table = Table("asc", "id", null, 10, 0, "") + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertTrue(getAlertsResponse.alerts[0].errorHistory[0].message == "IndexClosedException[closed]") + Assert.assertEquals(1, getAlertsResponse.alerts[0].errorHistory.size) + Assert.assertTrue(getAlertsResponse.alerts[0].errorMessage!!.contains("Failed to run percolate search")) + } + + fun `test monitor error alert created trigger run errored 2 times same error`() { + val docQuery = DocLevelQuery(query = "source:12345", name = "1") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = Script("invalid script code")) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + + var executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + var table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertTrue(getAlertsResponse.alerts[0].errorMessage!!.contains("Trigger errors")) + + val oldAlertStartTime = getAlertsResponse.alerts[0].startTime + + executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + table = Table("asc", "id", null, 10, 0, "") + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertEquals(0, getAlertsResponse.alerts[0].errorHistory.size) + Assert.assertTrue(getAlertsResponse.alerts[0].errorMessage!!.contains("Trigger errors")) + Assert.assertTrue(getAlertsResponse.alerts[0].startTime.isAfter(oldAlertStartTime)) + } + fun `test execute monitor with custom query index and nested mappings`() { val docQuery1 = DocLevelQuery(query = "message:\"msg 1 2 3 4\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index 6252332a2..5752bc82a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -216,6 +216,9 @@ class AlertIndicesIT : AlertingRestTestCase() { // Check if alert is active and alert index is created val activeAlert = searchAlerts(monitor) assertEquals("1 alert should be active", 1, activeAlert.size) + + waitUntil { return@waitUntil getAlertIndices().size == 2 } + assertEquals("Did not find 2 alert indices", 2, getAlertIndices().size) // History index is created but is empty assertEquals(0, getAlertHistoryDocCount()) @@ -273,6 +276,9 @@ class AlertIndicesIT : AlertingRestTestCase() { // Check if alert is active and alert index is created val activeAlert = searchAlerts(monitor) assertEquals("1 alert should be active", 1, activeAlert.size) + + waitUntil { return@waitUntil getAlertIndices().size == 2 } + assertEquals("Did not find 2 alert indices", 2, getAlertIndices().size) // History index is created but is empty assertEquals(0, getAlertHistoryDocCount())