diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 597bee2fb..5bc51afdd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -320,8 +320,8 @@ class AlertService( DeleteRequest(AlertIndices.ALERT_INDEX, alert.id) .routing(alert.monitorId), // Only add completed alert to history index if history is enabled - if (alertIndices.isHistoryEnabled()) { - IndexRequest(AlertIndices.HISTORY_WRITE_INDEX) + if (alertIndices.isAlertHistoryEnabled()) { + IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(alert.id) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 1b25fcfcf..d1205e1ff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -321,7 +321,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroDestinationSettings.EMAIL_USERNAME, LegacyOpenDistroDestinationSettings.EMAIL_PASSWORD, LegacyOpenDistroDestinationSettings.ALLOW_LIST, - LegacyOpenDistroDestinationSettings.HOST_DENY_LIST + LegacyOpenDistroDestinationSettings.HOST_DENY_LIST, + AlertingSettings.FINDING_HISTORY_ENABLED, + AlertingSettings.FINDING_HISTORY_MAX_DOCS, + AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, + AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 4bad6de19..63157aede 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -52,7 +52,7 @@ object BucketLevelMonitorRunner : MonitorRunner { var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) val currentAlerts = try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex() - monitorCtx.alertIndices!!.createOrUpdateInitialHistoryIndex() + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 3af1c974a..918577e95 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -13,6 +13,7 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.alerts.AlertIndices.Companion.FINDING_HISTORY_WRITE_INDEX import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob @@ -67,6 +68,18 @@ object DocumentReturningMonitorRunner : MonitorRunner { ): MonitorRunResult { logger.info("Document-level-monitor is running ...") var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) + + // TODO: is this needed from Charlie? + try { + monitorCtx.alertIndices!!.createOrUpdateAlertIndex() + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() + monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex() + } catch (e: Exception) { + val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id + logger.error("Error loading alerts for monitor: $id", e) + return monitorResult.copy(error = e) + } + try { validate(monitor) } catch (e: Exception) { @@ -246,7 +259,7 @@ object DocumentReturningMonitorRunner : MonitorRunner { ): String { val finding = Finding( id = UUID.randomUUID().toString(), - relatedDocId = matchingDocIds.joinToString(","), + relatedDocIds = matchingDocIds, monitorId = monitor.id, monitorName = monitor.name, index = index, @@ -259,9 +272,11 @@ object DocumentReturningMonitorRunner : MonitorRunner { logger.info("Findings: $findingStr") // todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. - val indexRequest = IndexRequest(".opensearch-alerting-findings") + val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(findingStr, XContentType.JSON) + .id(finding.id) + .routing(finding.id) monitorCtx.client!!.index(indexRequest).actionGet() return finding.id diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 964b2384c..7dfca5ec7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -175,7 +175,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon launch { try { monitorCtx.moveAlertsRetryPolicy!!.retry(logger) { - if (monitorCtx.alertIndices!!.isInitialized()) { + if (monitorCtx.alertIndices!!.isAlertInitialized()) { moveAlerts(monitorCtx.client!!, job.id, job) } } @@ -189,7 +189,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon launch { try { monitorCtx.moveAlertsRetryPolicy!!.retry(logger) { - if (monitorCtx.alertIndices!!.isInitialized()) { + if (monitorCtx.alertIndices!!.isAlertInitialized()) { moveAlerts(monitorCtx.client!!, jobId, null) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index c520ed672..568ab45b9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -45,7 +45,7 @@ object QueryLevelMonitorRunner : MonitorRunner { var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) val currentAlerts = try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex() - monitorCtx.alertIndices!!.createOrUpdateInitialHistoryIndex() + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index 85c20e0cc..a37cb7a0b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -21,19 +21,26 @@ import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_HISTORY_WRITE_INDEX import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX -import org.opensearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_INDEX_MAX_AGE import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_MAX_DOCS +import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_RETENTION_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_ROLLOVER_PERIOD +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_INDEX_MAX_AGE +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_MAX_DOCS +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.ClusterChangedEvent import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue @@ -44,14 +51,15 @@ import java.time.Instant /** * Class to manage the creation and rollover of alert indices and alert history indices. In progress alerts are stored - * in [ALERT_INDEX]. Completed alerts are written to [HISTORY_WRITE_INDEX] which is an alias that points at the - * current index to which completed alerts are written. [HISTORY_WRITE_INDEX] is periodically rolled over to a new + * in [ALERT_INDEX]. Completed alerts are written to [ALERT_HISTORY_WRITE_INDEX] which is an alias that points at the + * current index to which completed alerts are written. [ALERT_HISTORY_WRITE_INDEX] is periodically rolled over to a new * date based index. The frequency of rolling over indices is controlled by the `opendistro.alerting.alert_rollover_period` setting. * * These indexes are created when first used and are then rolled over every `alert_rollover_period`. The rollover is * initiated on the master node to ensure only a single node tries to roll it over. Once we have a curator functionality * in Scheduled Jobs we can migrate to using that to rollover the index. */ +// TODO: reafactor to make a generic version of this class for finding and alerts class AlertIndices( settings: Settings, private val client: Client, @@ -61,17 +69,28 @@ class AlertIndices( init { clusterService.addListener(this) - clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_ENABLED) { historyEnabled = it } - clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_MAX_DOCS) { historyMaxDocs = it } - clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_INDEX_MAX_AGE) { historyMaxAge = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_ENABLED) { alertHistoryEnabled = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_MAX_DOCS) { alertHistoryMaxDocs = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_INDEX_MAX_AGE) { alertHistoryMaxAge = it } clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_ROLLOVER_PERIOD) { - historyRolloverPeriod = it - rescheduleRollover() + alertHistoryRolloverPeriod = it + rescheduleAlertRollover() } - clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD) { - historyRetentionPeriod = it + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERT_HISTORY_RETENTION_PERIOD) { + alertHistoryRetentionPeriod = it } clusterService.clusterSettings.addSettingsUpdateConsumer(REQUEST_TIMEOUT) { requestTimeout = it } + + clusterService.clusterSettings.addSettingsUpdateConsumer(FINDING_HISTORY_ENABLED) { findingHistoryEnabled = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(FINDING_HISTORY_MAX_DOCS) { findingHistoryMaxDocs = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(FINDING_HISTORY_INDEX_MAX_AGE) { findingHistoryMaxAge = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(FINDING_HISTORY_ROLLOVER_PERIOD) { + findingHistoryRolloverPeriod = it + rescheduleFindingRollover() + } + clusterService.clusterSettings.addSettingsUpdateConsumer(FINDING_HISTORY_RETENTION_PERIOD) { + findingHistoryRetentionPeriod = it + } } companion object { @@ -80,33 +99,54 @@ class AlertIndices( const val ALERT_INDEX = ".opendistro-alerting-alerts" /** The alias of the index in which to write alert history */ - const val HISTORY_WRITE_INDEX = ".opendistro-alerting-alert-history-write" + const val ALERT_HISTORY_WRITE_INDEX = ".opendistro-alerting-alert-history-write" + + /** The alias of the index in which to write alert finding */ + const val FINDING_HISTORY_WRITE_INDEX = ".opensearch-alerting-finding-history-write" + + /** The index name pattern referring to all alert history indices */ + const val ALERT_HISTORY_ALL = ".opendistro-alerting-alert-history*" /** The index name pattern referring to all alert history indices */ - const val HISTORY_ALL = ".opendistro-alerting-alert-history*" + const val FINDING_HISTORY_ALL = ".opensearch-alerting-finding-history*" /** The index name pattern to create alert history indices */ - const val HISTORY_INDEX_PATTERN = "<.opendistro-alerting-alert-history-{now/d}-1>" + const val ALERT_HISTORY_INDEX_PATTERN = "<.opendistro-alerting-alert-history-{now/d}-1>" + + /** The index name pattern to create finding history indices */ + const val FINDING_HISTORY_INDEX_PATTERN = "<.opensearch-alerting-finding-history-{now/d}-1>" /** The index name pattern to query all alerts, history and current alerts. */ - const val ALL_INDEX_PATTERN = ".opendistro-alerting-alert*" + const val ALL_ALERT_INDEX_PATTERN = ".opendistro-alerting-alert*" + + /** The index name pattern to query all findings, history and current findings. */ + const val ALL_FINDING_INDEX_PATTERN = ".opensearch-alerting-finding*" @JvmStatic fun alertMapping() = AlertIndices::class.java.getResource("alert_mapping.json").readText() + @JvmStatic + fun findingMapping() = + AlertIndices::class.java.getResource("finding_mapping.json").readText() + private val logger = LogManager.getLogger(AlertIndices::class.java) } - @Volatile private var historyEnabled = AlertingSettings.ALERT_HISTORY_ENABLED.get(settings) + @Volatile private var alertHistoryEnabled = AlertingSettings.ALERT_HISTORY_ENABLED.get(settings) + @Volatile private var findingHistoryEnabled = AlertingSettings.FINDING_HISTORY_ENABLED.get(settings) - @Volatile private var historyMaxDocs = AlertingSettings.ALERT_HISTORY_MAX_DOCS.get(settings) + @Volatile private var alertHistoryMaxDocs = AlertingSettings.ALERT_HISTORY_MAX_DOCS.get(settings) + @Volatile private var findingHistoryMaxDocs = AlertingSettings.FINDING_HISTORY_MAX_DOCS.get(settings) - @Volatile private var historyMaxAge = AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE.get(settings) + @Volatile private var alertHistoryMaxAge = AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE.get(settings) + @Volatile private var findingHistoryMaxAge = AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE.get(settings) - @Volatile private var historyRolloverPeriod = AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.get(settings) + @Volatile private var alertHistoryRolloverPeriod = AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.get(settings) + @Volatile private var findingHistoryRolloverPeriod = AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD.get(settings) - @Volatile private var historyRetentionPeriod = AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.get(settings) + @Volatile private var alertHistoryRetentionPeriod = AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.get(settings) + @Volatile private var findingHistoryRetentionPeriod = AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD.get(settings) @Volatile private var requestTimeout = AlertingSettings.REQUEST_TIMEOUT.get(settings) @@ -115,7 +155,9 @@ class AlertIndices( // for JobsMonitor to report var lastRolloverTime: TimeValue? = null - private var historyIndexInitialized: Boolean = false + private var alertHistoryIndexInitialized: Boolean = false + + private var findingHistoryIndexInitialized: Boolean = false private var alertIndexInitialized: Boolean = false @@ -124,15 +166,18 @@ class AlertIndices( fun onMaster() { try { // try to rollover immediately as we might be restarting the cluster - rolloverHistoryIndex() + rolloverAlertHistoryIndex() + rolloverFindingHistoryIndex() // schedule the next rollover for approx MAX_AGE later scheduledRollover = threadPool - .scheduleWithFixedDelay({ rolloverAndDeleteHistoryIndices() }, historyRolloverPeriod, executorName()) + .scheduleWithFixedDelay({ rolloverAndDeleteAlertHistoryIndices() }, alertHistoryRolloverPeriod, executorName()) + scheduledRollover = threadPool + .scheduleWithFixedDelay({ rolloverAndDeleteFindingHistoryIndices() }, findingHistoryRolloverPeriod, executorName()) } catch (e: Exception) { // This should be run on cluster startup logger.error( - "Error creating alert indices. " + - "Alerts can't be recorded until master node is restarted.", + "Error creating alert/finding indices. " + + "Alerts/Findings can't be recorded until master node is restarted.", e ) } @@ -161,45 +206,74 @@ class AlertIndices( // if the indexes have been deleted they need to be reinitialized alertIndexInitialized = event.state().routingTable().hasIndex(ALERT_INDEX) - historyIndexInitialized = event.state().metadata().hasAlias(HISTORY_WRITE_INDEX) + alertHistoryIndexInitialized = event.state().metadata().hasAlias(ALERT_HISTORY_WRITE_INDEX) + findingHistoryIndexInitialized = event.state().metadata().hasAlias(FINDING_HISTORY_WRITE_INDEX) + } + + private fun rescheduleAlertRollover() { + if (clusterService.state().nodes.isLocalNodeElectedMaster) { + scheduledRollover?.cancel() + scheduledRollover = threadPool + .scheduleWithFixedDelay({ rolloverAndDeleteAlertHistoryIndices() }, alertHistoryRolloverPeriod, executorName()) + } } - private fun rescheduleRollover() { + private fun rescheduleFindingRollover() { if (clusterService.state().nodes.isLocalNodeElectedMaster) { scheduledRollover?.cancel() scheduledRollover = threadPool - .scheduleWithFixedDelay({ rolloverAndDeleteHistoryIndices() }, historyRolloverPeriod, executorName()) + .scheduleWithFixedDelay({ rolloverAndDeleteFindingHistoryIndices() }, findingHistoryRolloverPeriod, executorName()) } } - fun isInitialized(): Boolean { - return alertIndexInitialized && historyIndexInitialized + fun isAlertInitialized(): Boolean { + return alertIndexInitialized && alertHistoryIndexInitialized } - fun isHistoryEnabled(): Boolean = historyEnabled + fun isAlertHistoryEnabled(): Boolean = alertHistoryEnabled + + fun isFindingHistoryEnabled(): Boolean = findingHistoryEnabled suspend fun createOrUpdateAlertIndex() { if (!alertIndexInitialized) { - alertIndexInitialized = createIndex(ALERT_INDEX) + alertIndexInitialized = createIndex(ALERT_INDEX, alertMapping()) if (alertIndexInitialized) IndexUtils.alertIndexUpdated() } else { - if (!IndexUtils.alertIndexUpdated) updateIndexMapping(ALERT_INDEX) + if (!IndexUtils.alertIndexUpdated) updateIndexMapping(ALERT_INDEX, alertMapping()) } alertIndexInitialized } - suspend fun createOrUpdateInitialHistoryIndex() { - if (!historyIndexInitialized) { - historyIndexInitialized = createIndex(HISTORY_INDEX_PATTERN, HISTORY_WRITE_INDEX) - if (historyIndexInitialized) - IndexUtils.lastUpdatedHistoryIndex = IndexUtils.getIndexNameWithAlias(clusterService.state(), HISTORY_WRITE_INDEX) + suspend fun createOrUpdateInitialAlertHistoryIndex() { + if (!alertHistoryIndexInitialized) { + alertHistoryIndexInitialized = createIndex(ALERT_HISTORY_INDEX_PATTERN, alertMapping(), ALERT_HISTORY_WRITE_INDEX) + if (alertHistoryIndexInitialized) + IndexUtils.lastUpdatedAlertHistoryIndex = IndexUtils.getIndexNameWithAlias( + clusterService.state(), + ALERT_HISTORY_WRITE_INDEX + ) } else { - updateIndexMapping(HISTORY_WRITE_INDEX, true) + updateIndexMapping(ALERT_HISTORY_WRITE_INDEX, alertMapping(), true) } - historyIndexInitialized + alertHistoryIndexInitialized } - private suspend fun createIndex(index: String, alias: String? = null): Boolean { + suspend fun createOrUpdateInitialFindingHistoryIndex() { + if (!findingHistoryIndexInitialized) { + findingHistoryIndexInitialized = createIndex(FINDING_HISTORY_INDEX_PATTERN, findingMapping(), FINDING_HISTORY_WRITE_INDEX) + if (findingHistoryIndexInitialized) { + IndexUtils.lastUpdatedFindingHistoryIndex = IndexUtils.getIndexNameWithAlias( + clusterService.state(), + FINDING_HISTORY_WRITE_INDEX + ) + } + } else { + updateIndexMapping(FINDING_HISTORY_WRITE_INDEX, findingMapping(), true) + } + findingHistoryIndexInitialized + } + + private suspend fun createIndex(index: String, schemaMapping: String, alias: String? = null): Boolean { // This should be a fast check of local cluster state. Should be exceedingly rare that the local cluster // state does not contain the index and multiple nodes concurrently try to create the index. // If it does happen that error is handled we catch the ResourceAlreadyExistsException @@ -209,7 +283,7 @@ class AlertIndices( if (existsResponse.isExists) return true val request = CreateIndexRequest(index) - .mapping(alertMapping()) + .mapping(schemaMapping) .settings(Settings.builder().put("index.hidden", true).build()) if (alias != null) request.alias(Alias(alias)) @@ -221,15 +295,14 @@ class AlertIndices( } } - private suspend fun updateIndexMapping(index: String, alias: Boolean = false) { + private suspend fun updateIndexMapping(index: String, mapping: String, alias: Boolean = false) { val clusterState = clusterService.state() - val mapping = alertMapping() var targetIndex = index if (alias) { targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index) } - if (targetIndex == IndexUtils.lastUpdatedHistoryIndex) { + if (targetIndex == IndexUtils.lastUpdatedAlertHistoryIndex || targetIndex == IndexUtils.lastUpdatedFindingHistoryIndex) { return } @@ -247,63 +320,92 @@ class AlertIndices( private fun setIndexUpdateFlag(index: String, targetIndex: String) { when (index) { ALERT_INDEX -> IndexUtils.alertIndexUpdated() - HISTORY_WRITE_INDEX -> IndexUtils.lastUpdatedHistoryIndex = targetIndex + ALERT_HISTORY_WRITE_INDEX -> IndexUtils.lastUpdatedAlertHistoryIndex = targetIndex + FINDING_HISTORY_WRITE_INDEX -> IndexUtils.lastUpdatedFindingHistoryIndex = targetIndex } } - private fun rolloverAndDeleteHistoryIndices() { - if (historyEnabled) rolloverHistoryIndex() - deleteOldHistoryIndices() + private fun rolloverAndDeleteAlertHistoryIndices() { + if (alertHistoryEnabled) rolloverAlertHistoryIndex() + deleteOldIndices("History", ALERT_HISTORY_ALL) + } + + private fun rolloverAndDeleteFindingHistoryIndices() { + if (findingHistoryEnabled) rolloverFindingHistoryIndex() + deleteOldIndices("Finding", FINDING_HISTORY_ALL) } - private fun rolloverHistoryIndex() { - if (!historyIndexInitialized) { + private fun rolloverIndex( + initialized: Boolean, + index: String, + pattern: String, + map: String, + docsCondition: Long, + ageCondition: TimeValue, + writeIndex: String + ) { + if (!initialized) { return } // We have to pass null for newIndexName in order to get Elastic to increment the index count. - val request = RolloverRequest(HISTORY_WRITE_INDEX, null) - request.createIndexRequest.index(HISTORY_INDEX_PATTERN) - .mapping(alertMapping()) + val request = RolloverRequest(index, null) + request.createIndexRequest.index(pattern) + .mapping(map) .settings(Settings.builder().put("index.hidden", true).build()) - request.addMaxIndexDocsCondition(historyMaxDocs) - request.addMaxIndexAgeCondition(historyMaxAge) + request.addMaxIndexDocsCondition(docsCondition) + request.addMaxIndexAgeCondition(ageCondition) client.admin().indices().rolloverIndex( request, object : ActionListener { override fun onResponse(response: RolloverResponse) { if (!response.isRolledOver) { - logger.info("$HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus}") + logger.info("$writeIndex not rolled over. Conditions were: ${response.conditionStatus}") } else { lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis()) } } override fun onFailure(e: Exception) { - logger.error("$HISTORY_WRITE_INDEX not roll over failed.") + logger.error("$writeIndex not roll over failed.") } } ) } - private fun deleteOldHistoryIndices() { + private fun rolloverAlertHistoryIndex() { + rolloverIndex( + alertHistoryIndexInitialized, ALERT_HISTORY_WRITE_INDEX, + ALERT_HISTORY_INDEX_PATTERN, alertMapping(), + alertHistoryMaxDocs, alertHistoryMaxAge, ALERT_HISTORY_WRITE_INDEX + ) + } + + private fun rolloverFindingHistoryIndex() { + rolloverIndex( + findingHistoryIndexInitialized, FINDING_HISTORY_WRITE_INDEX, + FINDING_HISTORY_INDEX_PATTERN, findingMapping(), + findingHistoryMaxDocs, findingHistoryMaxAge, FINDING_HISTORY_WRITE_INDEX + ) + } + private fun deleteOldIndices(tag: String, indices: String) { + logger.error("info deleteOldIndices") val clusterStateRequest = ClusterStateRequest() .clear() - .indices(HISTORY_ALL) + .indices(indices) .metadata(true) .local(true) .indicesOptions(IndicesOptions.strictExpand()) - client.admin().cluster().state( clusterStateRequest, object : ActionListener { override fun onResponse(clusterStateResponse: ClusterStateResponse) { if (!clusterStateResponse.state.metadata.indices.isEmpty) { val indicesToDelete = getIndicesToDelete(clusterStateResponse) - logger.info("Deleting old history indices viz $indicesToDelete") + logger.info("Deleting old $tag indices viz $indicesToDelete") deleteAllOldHistoryIndices(indicesToDelete) } else { - logger.info("No Old History Indices to delete") + logger.info("No Old $tag Indices to delete") } } override fun onFailure(e: Exception) { @@ -317,24 +419,39 @@ class AlertIndices( val indicesToDelete = mutableListOf() for (entry in clusterStateResponse.state.metadata.indices) { val indexMetaData = entry.value - val creationTime = indexMetaData.creationDate - - if ((Instant.now().toEpochMilli() - creationTime) > historyRetentionPeriod.millis) { - val alias = indexMetaData.aliases.firstOrNull { HISTORY_WRITE_INDEX == it.value.alias } - if (alias != null) { - if (historyEnabled) { - // If the index has the write alias and history is enabled, don't delete the index - continue - } else { - // Otherwise reset historyIndexInitialized since index will be deleted - historyIndexInitialized = false - } - } + getHistoryIndexToDelete(indexMetaData, alertHistoryRetentionPeriod.millis, ALERT_HISTORY_WRITE_INDEX, alertHistoryEnabled) + ?.let { indicesToDelete.add(it) } + getHistoryIndexToDelete(indexMetaData, findingHistoryRetentionPeriod.millis, FINDING_HISTORY_WRITE_INDEX, findingHistoryEnabled) + ?.let { indicesToDelete.add(it) } + } + return indicesToDelete + } - indicesToDelete.add(indexMetaData.index.name) + private fun getHistoryIndexToDelete( + indexMetadata: IndexMetadata, + retentionPeriodMillis: Long, + writeIndex: String, + historyEnabled: Boolean + ): String? { + val creationTime = indexMetadata.creationDate + if ((Instant.now().toEpochMilli() - creationTime) > retentionPeriodMillis) { + val alias = indexMetadata.aliases.firstOrNull { writeIndex == it.value.alias } + if (alias != null) { + if (historyEnabled) { + // If the index has the write alias and history is enabled, don't delete the index + return null + } else if (writeIndex == ALERT_HISTORY_WRITE_INDEX) { + // Otherwise reset alertHistoryIndexInitialized since index will be deleted + alertHistoryIndexInitialized = false + } else if (writeIndex == FINDING_HISTORY_WRITE_INDEX) { + // Otherwise reset findingHistoryIndexInitialized since index will be deleted + findingHistoryIndexInitialized = false + } } + + return indexMetadata.index.name } - return indicesToDelete + return null } private fun deleteAllOldHistoryIndices(indicesToDelete: List) { @@ -345,12 +462,14 @@ class AlertIndices( object : ActionListener { override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) { if (!deleteIndicesResponse.isAcknowledged) { - logger.error("Could not delete one or more Alerting history indices: $indicesToDelete. Retrying one by one.") + logger.error( + "Could not delete one or more Alerting/Finding history indices: $indicesToDelete. Retrying one by one." + ) deleteOldHistoryIndex(indicesToDelete) } } override fun onFailure(e: Exception) { - logger.error("Delete for Alerting History Indices $indicesToDelete Failed. Retrying one By one.") + logger.error("Delete for Alerting/Finding History Indices $indicesToDelete Failed. Retrying one By one.") deleteOldHistoryIndex(indicesToDelete) } } @@ -367,7 +486,7 @@ class AlertIndices( override fun onResponse(acknowledgedResponse: AcknowledgedResponse?) { if (acknowledgedResponse != null) { if (!acknowledgedResponse.isAcknowledged) { - logger.error("Could not delete one or more Alerting history indices: $index") + logger.error("Could not delete one or more Alerting/Finding history indices: $index") } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt index 880faa332..aa6f1863c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt @@ -11,8 +11,8 @@ import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_HISTORY_WRITE_INDEX import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX -import org.opensearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.Monitor @@ -37,7 +37,7 @@ import org.opensearch.search.builder.SearchSourceBuilder * 1. Find active alerts: * a. matching monitorId if no monitor is provided (postDelete) * b. matching monitorId and no triggerIds if monitor is provided (postIndex) - * 2. Move alerts over to [HISTORY_WRITE_INDEX] as DELETED + * 2. Move alerts over to [ALERT_HISTORY_WRITE_INDEX] as DELETED * 3. Delete alerts from [ALERT_INDEX] * 4. Schedule a retry if there were any failures */ @@ -61,7 +61,7 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu // If no alerts are found, simply return if (response.hits.totalHits?.value == 0L) return val indexRequests = response.hits.map { hit -> - IndexRequest(AlertIndices.HISTORY_WRITE_INDEX) + IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) .routing(monitorId) .source( Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt index 929b99103..e34fa9510 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Finding.kt @@ -22,7 +22,7 @@ import java.time.Instant */ class Finding( val id: String = NO_ID, - val relatedDocId: String, + val relatedDocIds: List, val monitorId: String, val monitorName: String, val index: String, @@ -33,7 +33,7 @@ class Finding( @Throws(IOException::class) constructor(sin: StreamInput) : this( id = sin.readString(), - relatedDocId = sin.readString(), + relatedDocIds = sin.readStringList(), monitorId = sin.readString(), monitorName = sin.readString(), index = sin.readString(), @@ -44,7 +44,7 @@ class Finding( fun asTemplateArg(): Map { return mapOf( FINDING_ID_FIELD to id, - RELATED_DOC_ID_FIELD to relatedDocId, + RELATED_DOC_IDS_FIELD to relatedDocIds, MONITOR_ID_FIELD to monitorId, MONITOR_NAME_FIELD to monitorName, INDEX_FIELD to index, @@ -56,7 +56,7 @@ class Finding( override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() .field(FINDING_ID_FIELD, id) - .field(RELATED_DOC_ID_FIELD, relatedDocId) + .field(RELATED_DOC_IDS_FIELD, relatedDocIds) .field(MONITOR_ID_FIELD, monitorId) .field(MONITOR_NAME_FIELD, monitorName) .field(INDEX_FIELD, index) @@ -69,7 +69,7 @@ class Finding( @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(id) - out.writeString(relatedDocId) + out.writeStringCollection(relatedDocIds) out.writeString(monitorId) out.writeString(monitorName) out.writeString(index) @@ -79,7 +79,7 @@ class Finding( companion object { const val FINDING_ID_FIELD = "id" - const val RELATED_DOC_ID_FIELD = "related_doc_id" + const val RELATED_DOC_IDS_FIELD = "related_doc_ids" const val MONITOR_ID_FIELD = "monitor_id" const val MONITOR_NAME_FIELD = "monitor_name" const val INDEX_FIELD = "index" @@ -91,7 +91,7 @@ class Finding( @Throws(IOException::class) fun parse(xcp: XContentParser): Finding { var id: String = NO_ID - lateinit var relatedDocId: String + val relatedDocIds: MutableList = mutableListOf() lateinit var monitorId: String lateinit var monitorName: String lateinit var index: String @@ -105,7 +105,12 @@ class Finding( when (fieldName) { FINDING_ID_FIELD -> id = xcp.text() - RELATED_DOC_ID_FIELD -> relatedDocId = xcp.text() + RELATED_DOC_IDS_FIELD -> { + ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + relatedDocIds.add(xcp.text()) + } + } MONITOR_ID_FIELD -> monitorId = xcp.text() MONITOR_NAME_FIELD -> monitorName = xcp.text() INDEX_FIELD -> index = xcp.text() @@ -123,7 +128,7 @@ class Finding( return Finding( id = id, - relatedDocId = relatedDocId, + relatedDocIds = relatedDocIds, monitorId = monitorId, monitorName = monitorName, index = index, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt index 430b143c4..44446bb78 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestSearchMonitorAction.kt @@ -11,7 +11,7 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.action.SearchMonitorRequest -import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_INDEX_PATTERN +import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.model.Monitor @@ -89,7 +89,7 @@ class RestSearchMonitorAction( log.debug("${request.method()} ${AlertingPlugin.MONITOR_BASE_URI}/_search") val index = request.param("index", SCHEDULED_JOBS_INDEX) - if (index != SCHEDULED_JOBS_INDEX && index != ALL_INDEX_PATTERN) { + if (index != SCHEDULED_JOBS_INDEX && index != ALL_ALERT_INDEX_PATTERN) { throw IllegalArgumentException("Invalid index name.") } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 71f1bae0c..1268703c9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -7,6 +7,8 @@ package org.opensearch.alerting.settings import org.opensearch.alerting.AlertingPlugin import org.opensearch.common.settings.Setting +import org.opensearch.common.unit.TimeValue +import java.util.concurrent.TimeUnit /** * settings specific to [AlertingPlugin]. These settings include things like history index max age, request timeout, etc... @@ -73,30 +75,63 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + // TODO: Do we want to let users to disable this? If so, we need to fix the rollover logic + // such that the main index is findings and rolls over to the finding history index + val FINDING_HISTORY_ENABLED = Setting.boolSetting( + "plugins.alerting.alert_finding_enabled", + true, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val ALERT_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting( "plugins.alerting.alert_history_rollover_period", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD, Setting.Property.NodeScope, Setting.Property.Dynamic ) + val FINDING_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting( + "plugins.alerting.alert_finding_rollover_period", + TimeValue.timeValueHours(12), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val ALERT_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( "plugins.alerting.alert_history_max_age", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE, Setting.Property.NodeScope, Setting.Property.Dynamic ) + val FINDING_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( + "plugins.alerting.finding_history_max_age", + TimeValue(30, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val ALERT_HISTORY_MAX_DOCS = Setting.longSetting( "plugins.alerting.alert_history_max_docs", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_MAX_DOCS, Setting.Property.NodeScope, Setting.Property.Dynamic ) + val FINDING_HISTORY_MAX_DOCS = Setting.longSetting( + "plugins.alerting.alert_finding_max_docs", + 1000L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated + ) + val ALERT_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( "plugins.alerting.alert_history_retention_period", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, Setting.Property.NodeScope, Setting.Property.Dynamic ) + val FINDING_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( + "plugins.alerting.finding_history_retention_period", + TimeValue(60, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val REQUEST_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.request_timeout", LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/LegacyOpenDistroAlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/LegacyOpenDistroAlertingSettings.kt index be8a7d437..387b6cec9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/LegacyOpenDistroAlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/LegacyOpenDistroAlertingSettings.kt @@ -99,6 +99,12 @@ class LegacyOpenDistroAlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated ) + val ALERT_FINDING_RETENTION_PERIOD = Setting.positiveTimeSetting( + "opendistro.alerting.alert_finding_retention_period", + TimeValue(60, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated + ) + val REQUEST_TIMEOUT = Setting.positiveTimeSetting( "opendistro.alerting.request_timeout", TimeValue.timeValueSeconds(10), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt index 908b895cd..523175a35 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.ActionListener import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters @@ -20,9 +22,12 @@ import org.opensearch.alerting.action.AcknowledgeAlertResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.elasticapi.optionalTimeField import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentFactory @@ -41,12 +46,20 @@ private val log = LogManager.getLogger(TransportAcknowledgeAlertAction::class.ja class TransportAcknowledgeAlertAction @Inject constructor( transportService: TransportService, val client: Client, + clusterService: ClusterService, actionFilters: ActionFilters, + val settings: Settings, val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( AcknowledgeAlertAction.NAME, transportService, actionFilters, ::AcknowledgeAlertRequest ) { + @Volatile private var isAlertHistoryEnabled = AlertingSettings.ALERT_HISTORY_ENABLED.get(settings) + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.ALERT_HISTORY_ENABLED) { isAlertHistoryEnabled = it } + } + override fun doExecute(task: Task, request: AcknowledgeAlertRequest, actionListener: ActionListener) { client.threadPool().threadContext.stashContext().use { AcknowledgeHandler(client, actionListener, request).start() @@ -92,7 +105,9 @@ class TransportAcknowledgeAlertAction @Inject constructor( } private fun onSearchResponse(response: SearchResponse) { - val updateRequests = response.hits.flatMap { hit -> + val updateRequests = mutableListOf() + val copyRequests = mutableListOf() + response.hits.forEach { hit -> val xcp = XContentHelper.createParser( xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceRef, XContentType.JSON @@ -100,9 +115,10 @@ class TransportAcknowledgeAlertAction @Inject constructor( XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) val alert = Alert.parse(xcp, hit.id, hit.version) alerts[alert.id] = alert + if (alert.state == Alert.State.ACTIVE) { - listOf( - UpdateRequest(AlertIndices.ALERT_INDEX, hit.id) + if (alert.findingIds.isEmpty() || !isAlertHistoryEnabled) { + val updateRequest = UpdateRequest(AlertIndices.ALERT_INDEX, alert.id) .routing(request.monitorId) .setIfSeqNo(hit.seqNo) .setIfPrimaryTerm(hit.primaryTerm) @@ -112,41 +128,48 @@ class TransportAcknowledgeAlertAction @Inject constructor( .optionalTimeField(Alert.ACKNOWLEDGED_TIME_FIELD, Instant.now()) .endObject() ) - ) - } else { - emptyList() + updateRequests.add(updateRequest) + } else { + val copyRequest = IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) + .routing(request.monitorId) + .id(alert.id) + .source( + alert.copy(state = Alert.State.ACKNOWLEDGED, acknowledgedTime = Instant.now()) + .toXContentWithUser(XContentFactory.jsonBuilder()) + ) + copyRequests.add(copyRequest) + } } } - log.info("Acknowledging monitor: $request.monitorId, alerts: ${updateRequests.map { it.id() }}") - val bulkRequest = BulkRequest().add(updateRequests).setRefreshPolicy(request.refreshPolicy) - client.bulk( - bulkRequest, - object : ActionListener { - override fun onResponse(response: BulkResponse) { - onBulkResponse(response) - } - - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } - } - ) + try { + val updateResponse = if (updateRequests.isNotEmpty()) + client.bulk(BulkRequest().add(updateRequests).setRefreshPolicy(request.refreshPolicy)).actionGet() + else null + val copyResponse = if (copyRequests.isNotEmpty()) + client.bulk(BulkRequest().add(copyRequests).setRefreshPolicy(request.refreshPolicy)).actionGet() + else null + onBulkResponse(updateResponse, copyResponse) + } catch (t: Exception) { + log.error("ack error: ${t.message}") + actionListener.onFailure(AlertingException.wrap(t)) + } } - private fun onBulkResponse(response: BulkResponse) { + private fun onBulkResponse(updateResponse: BulkResponse?, copyResponse: BulkResponse?) { + val deleteRequests = mutableListOf() val missing = request.alertIds.toMutableSet() val acknowledged = mutableListOf() val failed = mutableListOf() - // First handle all alerts that aren't currently ACTIVE. These can't be acknowledged. + alerts.values.forEach { if (it.state != Alert.State.ACTIVE) { missing.remove(it.id) failed.add(it) } } - // Now handle all alerts we tried to acknowledge... - response.items.forEach { item -> + + updateResponse?.items?.forEach { item -> missing.remove(item.id) if (item.isFailed) { failed.add(alerts[item.id]!!) @@ -154,6 +177,36 @@ class TransportAcknowledgeAlertAction @Inject constructor( acknowledged.add(alerts[item.id]!!) } } + + copyResponse?.items?.forEach { item -> + log.info("got a copyResponse: $item") + missing.remove(item.id) + if (item.isFailed) { + log.info("got a failureResponse: ${item.failureMessage}") + failed.add(alerts[item.id]!!) + } else { + val deleteRequest = DeleteRequest(AlertIndices.ALERT_INDEX, item.id) + .routing(request.monitorId) + deleteRequests.add(deleteRequest) + } + } + + if (deleteRequests.isNotEmpty()) { + try { + val deleteResponse = client.bulk(BulkRequest().add(deleteRequests).setRefreshPolicy(request.refreshPolicy)).actionGet() + deleteResponse.items.forEach { item -> + missing.remove(item.id) + if (item.isFailed) { + failed.add(alerts[item.id]!!) + } else { + acknowledged.add(alerts[item.id]!!) + } + } + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + return + } + } actionListener.onResponse(AcknowledgeAlertResponse(acknowledged.toList(), failed.toList(), missing.toList())) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt index 1df607015..a31d5a1ab 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt @@ -135,7 +135,7 @@ class TransportGetAlertsAction @Inject constructor( fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener) { val searchRequest = SearchRequest() - .indices(AlertIndices.ALL_INDEX_PATTERN) + .indices(AlertIndices.ALL_ALERT_INDEX_PATTERN) .source(searchSourceBuilder) client.search( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt index ca9971843..f91d0efd4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt @@ -15,6 +15,7 @@ import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.action.GetFindingsAction import org.opensearch.alerting.action.GetFindingsRequest import org.opensearch.alerting.action.GetFindingsResponse +import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN import org.opensearch.alerting.model.Finding import org.opensearch.alerting.model.FindingDocument import org.opensearch.alerting.model.FindingWithDocs @@ -108,7 +109,7 @@ class TransportGetFindingsSearchAction @Inject constructor( fun search(searchSourceBuilder: SearchSourceBuilder, actionListener: ActionListener) { val searchRequest = SearchRequest() .source(searchSourceBuilder) - .indices(".opensearch-alerting-findings") + .indices(ALL_FINDING_INDEX_PATTERN) client.search( searchRequest, object : ActionListener { @@ -123,7 +124,7 @@ class TransportGetFindingsSearchAction @Inject constructor( XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) val finding = Finding.parse(xcp) findings.add(finding) - val documentIds = finding.relatedDocId.split(",").toTypedArray() + val documentIds = finding.relatedDocIds // Add getRequests to mget request documentIds.forEach { docId -> @@ -132,7 +133,7 @@ class TransportGetFindingsSearchAction @Inject constructor( } val documents = searchDocument(mgetRequest) findings.forEach { - val documentIds = it.relatedDocId.split(",").toTypedArray() + val documentIds = it.relatedDocIds val relatedDocs = mutableListOf() for (docId in documentIds) { val key = "${it.index}|$docId" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index e0ce289a8..9f299e8c5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -29,16 +29,22 @@ class IndexUtils { private set var alertIndexSchemaVersion: Int private set + var findingIndexSchemaVersion: Int + private set var scheduledJobIndexUpdated: Boolean = false private set var alertIndexUpdated: Boolean = false private set - var lastUpdatedHistoryIndex: String? = null + var findingIndexUpdated: Boolean = false + private set + var lastUpdatedAlertHistoryIndex: String? = null + var lastUpdatedFindingHistoryIndex: String? = null init { scheduledJobIndexSchemaVersion = getSchemaVersion(ScheduledJobIndices.scheduledJobMappings()) alertIndexSchemaVersion = getSchemaVersion(AlertIndices.alertMapping()) + findingIndexSchemaVersion = getSchemaVersion(AlertIndices.findingMapping()) } @JvmStatic @@ -51,6 +57,11 @@ class IndexUtils { alertIndexUpdated = true } + @JvmStatic + fun findingIndexUpdated() { + findingIndexUpdated = true + } + @JvmStatic fun getSchemaVersion(mapping: String): Int { val xcp = XContentType.JSON.xContent().createParser( diff --git a/alerting/src/main/resources/org/opensearch/alerting/findings/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json similarity index 91% rename from alerting/src/main/resources/org/opensearch/alerting/findings/finding_mapping.json rename to alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index fc52e3945..c9386b2ef 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/findings/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -1,8 +1,5 @@ { "dynamic": "strict", - "_routing": { - "required": true - }, "_meta" : { "schema_version": 1 }, @@ -37,6 +34,9 @@ "type": "keyword" }, "name": { + "type": "keyword" + }, + "query": { "type": "text" }, "tags": { @@ -50,7 +50,7 @@ } }, "timestamp": { - "type": "date" + "type": "long" } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt index f4e0d6a7c..1cdc5ac1a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt @@ -57,6 +57,11 @@ class AlertServiceTests : OpenSearchTestCase() { settingSet.add(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD) settingSet.add(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD) settingSet.add(AlertingSettings.REQUEST_TIMEOUT) + settingSet.add(AlertingSettings.FINDING_HISTORY_ENABLED) + settingSet.add(AlertingSettings.FINDING_HISTORY_MAX_DOCS) + settingSet.add(AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE) + settingSet.add(AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD) + settingSet.add(AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD) val discoveryNode = DiscoveryNode("node", buildNewFakeTransportAddress(), Version.CURRENT) val clusterSettings = ClusterSettings(settings, settingSet) val testClusterService = ClusterServiceUtils.createClusterService(threadPool, discoveryNode, clusterSettings) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 2cfb25d2e..2e0fc9f6d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -443,7 +443,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } protected fun createRandomDocumentMonitor(refresh: Boolean = false, withMetadata: Boolean = false): Monitor { - val monitor = randomDocumentReturningMonitor(withMetadata = withMetadata) + val monitor = randomDocumentLevelMonitor(withMetadata = withMetadata) val monitorId = createMonitor(monitor, refresh).id if (withMetadata) { return getMonitor(monitorId = monitorId, header = BasicHeader(HttpHeaders.USER_AGENT, "OpenSearch-Dashboards")) @@ -515,11 +515,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { monitorName: String = "NO_NAME", index: String = "testIndex", docLevelQueries: List = listOf(DocLevelQuery(query = "test_field:\"us-west-2\"", name = "testQuery")), - matchingDocIds: Set + matchingDocIds: List ): String { val finding = Finding( id = UUID.randomUUID().toString(), - relatedDocId = matchingDocIds.joinToString(","), + relatedDocIds = matchingDocIds, monitorId = monitorId, monitorName = monitorName, index = index, @@ -535,7 +535,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { protected fun searchFindings( monitor: Monitor, - indices: String = ".opensearch-alerting-findings", + indices: String = AlertIndices.ALL_FINDING_INDEX_PATTERN, refresh: Boolean = true ): List { if (refresh) refreshIndex(indices) @@ -556,7 +556,12 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } protected fun searchAlerts(monitor: Monitor, indices: String = AlertIndices.ALERT_INDEX, refresh: Boolean = true): List { - if (refresh) refreshIndex(indices) + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return emptyList() + } // If this is a test monitor (it doesn't have an ID) and no alerts will be saved for it. val searchParams = if (monitor.id != Monitor.NO_ID) mapOf("routing" to monitor.id) else mapOf() @@ -749,10 +754,18 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { fun putAlertMappings(mapping: String? = null) { val mappingHack = if (mapping != null) mapping else AlertIndices.alertMapping().trimStart('{').trimEnd('}') - val encodedHistoryIndex = URLEncoder.encode(AlertIndices.HISTORY_INDEX_PATTERN, Charsets.UTF_8.toString()) + val encodedHistoryIndex = URLEncoder.encode(AlertIndices.ALERT_HISTORY_INDEX_PATTERN, Charsets.UTF_8.toString()) val settings = Settings.builder().put("index.hidden", true).build() createIndex(AlertIndices.ALERT_INDEX, settings, mappingHack) - createIndex(encodedHistoryIndex, settings, mappingHack, "\"${AlertIndices.HISTORY_WRITE_INDEX}\" : {}") + createIndex(encodedHistoryIndex, settings, mappingHack, "\"${AlertIndices.ALERT_HISTORY_WRITE_INDEX}\" : {}") + } + + fun putFindingMappings(mapping: String? = null) { + val mappingHack = if (mapping != null) mapping else AlertIndices.findingMapping().trimStart('{').trimEnd('}') + val encodedHistoryIndex = URLEncoder.encode(AlertIndices.FINDING_HISTORY_INDEX_PATTERN, Charsets.UTF_8.toString()) + val settings = Settings.builder().put("index.hidden", true).build() +// createIndex(AlertIndices.FINDING_HISTORY_WRITE_INDEX, settings, mappingHack) + createIndex(encodedHistoryIndex, settings, mappingHack, "\"${AlertIndices.FINDING_HISTORY_WRITE_INDEX}\" : {}") } fun scheduledJobMappings(): String { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index fcc70ffba..bb401dd5d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -7,8 +7,6 @@ package org.opensearch.alerting import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery -import org.opensearch.alerting.model.Alert -import java.time.Instant import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit.MILLIS @@ -16,22 +14,6 @@ import java.time.temporal.ChronoUnit.MILLIS class DocumentMonitorRunnerIT : AlertingRestTestCase() { fun `test execute monitor with dryrun`() { - val alert = Alert( - monitorId = "monitorId", - monitorName = "monitorName", - monitorVersion = 1L, - monitorUser = null, - triggerId = "triggerId", - triggerName = "triggerName", - findingIds = emptyList(), - relatedDocIds = emptyList(), - state = Alert.State.COMPLETED, - startTime = Instant.now(), - errorHistory = emptyList(), - severity = "sev3", - actionExecutionResults = emptyList() - ) - createAlert(alert) val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ @@ -46,9 +28,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val docReturningInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) - val monitor = randomDocumentReturningMonitor( + val monitor = randomDocumentLevelMonitor( inputs = listOf(docReturningInput), - triggers = listOf(randomDocumentReturningTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) ) indexDoc(index, "1", testDoc) @@ -85,8 +67,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) - val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) - val monitor = randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger)) indexDoc(testIndex, "1", testDoc) indexDoc(testIndex, "5", testDoc) @@ -105,6 +87,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } fun `test execute monitor generates alerts and findings`() { + putFindingMappings() val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) val testDoc = """{ @@ -116,8 +99,9 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) - val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) - val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) Thread.sleep(2000) indexDoc(testIndex, "1", testDoc) @@ -141,8 +125,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { // TODO: modify findings such that there is a finding per document, so this test will need to be modified val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertEquals("Findings saved for test monitor", "1", findings[0].relatedDocId) - assertEquals("Findings saved for test monitor", "5", findings[1].relatedDocId) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) } @Suppress("UNCHECKED_CAST") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index e0cde7415..540fd166c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -255,7 +255,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { executeMonitor(monitor.id) assertTrue("There's still an active alert", searchAlerts(monitor, AlertIndices.ALERT_INDEX).isEmpty()) - val completedAlert = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN).single() + val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() verifyAlert(completedAlert, monitor, COMPLETED) } @@ -490,7 +490,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { val errorAlert = searchAlerts(monitor).single() verifyAlert(errorAlert, monitor, ERROR) executeMonitor(monitor.id) - val completedAlert = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN).single() + val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() verifyAlert(completedAlert, monitor, COMPLETED) assertNull("Completed alert still has error message.", completedAlert.errorMessage) @@ -736,7 +736,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { Thread.sleep(200) updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id)) executeMonitor(monitor.id) - val completedAlert = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN).single() + val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() verifyAlert(completedAlert, monitor, COMPLETED) updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = ALWAYS_RUN)), id = monitor.id)) @@ -1210,7 +1210,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { executeMonitor(monitor.id) // Verify expected alert was completed - alerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) + alerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) val activeAlerts = alerts.filter { it.state == ACTIVE } val completedAlerts = alerts.filter { it.state == COMPLETED } assertEquals("Incorrect number of active alerts", 1, activeAlerts.size) @@ -1305,7 +1305,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Execute Monitor and check that both Alerts were updated Thread.sleep(200) executeMonitor(monitor.id) - currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) + currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) val completedAlerts = currentAlerts.filter { it.state == COMPLETED } assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size) val previouslyAcknowledgedAlert = completedAlerts.single { it.aggregationResultBucket?.getBucketKeysHash().equals("test_value_1") } @@ -1531,7 +1531,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Execute Monitor and check that both Alerts were moved to COMPLETED executeMonitor(monitor.id) - currentAlerts = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN) + currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) val completedAlerts = currentAlerts.filter { it.state == COMPLETED } assertEquals("Incorrect number of completed alerts", 2, completedAlerts.size) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 900a8b7a9..5eac91100 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -152,7 +152,7 @@ fun randomClusterMetricsMonitor( ) } -fun randomDocumentReturningMonitor( +fun randomDocumentLevelMonitor( name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), user: User? = randomUser(), inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), @@ -207,7 +207,7 @@ fun randomBucketLevelTrigger( fun randomActionsForBucketLevelTrigger(min: Int = 0, max: Int = 10, destinationId: String = ""): List = (min..randomInt(max)).map { randomActionWithPolicy(destinationId = destinationId) } -fun randomDocumentReturningTrigger( +fun randomDocumentLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), severity: String = "1", @@ -385,7 +385,7 @@ fun randomDocLevelMonitorInput( fun randomFinding( id: String = OpenSearchRestTestCase.randomAlphaOfLength(10), - relatedDocId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + relatedDocIds: List = listOf(OpenSearchRestTestCase.randomAlphaOfLength(10)), monitorId: String = OpenSearchRestTestCase.randomAlphaOfLength(10), monitorName: String = OpenSearchRestTestCase.randomAlphaOfLength(10), index: String = OpenSearchRestTestCase.randomAlphaOfLength(10), @@ -394,7 +394,7 @@ fun randomFinding( ): Finding { return Finding( id = id, - relatedDocId = relatedDocId, + relatedDocIds = relatedDocIds, monitorId = monitorId, monitorName = monitorName, index = index, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index acd9e25a2..dcf229fe4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -11,8 +11,12 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.ALWAYS_RUN import org.opensearch.alerting.AlertingRestTestCase import org.opensearch.alerting.NEVER_RUN +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.settings.AlertingSettings @@ -26,29 +30,64 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(randomQueryLevelMonitor(triggers = listOf(randomQueryLevelTrigger(condition = ALWAYS_RUN)))) assertIndexExists(AlertIndices.ALERT_INDEX) - assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) + assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) + } + + fun `test create finding index`() { + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + + executeMonitor(monitor.id) + + assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) } fun `test update alert index mapping with new schema version`() { wipeAllODFEIndices() assertIndexDoesNotExist(AlertIndices.ALERT_INDEX) - assertIndexDoesNotExist(AlertIndices.HISTORY_WRITE_INDEX) + assertIndexDoesNotExist(AlertIndices.ALERT_HISTORY_WRITE_INDEX) putAlertMappings( AlertIndices.alertMapping().trimStart('{').trimEnd('}') .replace("\"schema_version\": 3", "\"schema_version\": 0") ) assertIndexExists(AlertIndices.ALERT_INDEX) - assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) + assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 0) - verifyIndexSchemaVersion(AlertIndices.HISTORY_WRITE_INDEX, 0) + verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 0) wipeAllODFEIndices() executeMonitor(createRandomMonitor()) assertIndexExists(AlertIndices.ALERT_INDEX) - assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) + assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5) verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 3) - verifyIndexSchemaVersion(AlertIndices.HISTORY_WRITE_INDEX, 3) + verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 3) + } + + fun `test update finding index mapping with new schema version`() { + wipeAllODFEIndices() + assertIndexDoesNotExist(AlertIndices.FINDING_HISTORY_WRITE_INDEX) + + putFindingMappings( + AlertIndices.findingMapping().trimStart('{').trimEnd('}') + .replace("\"schema_version\": 1", "\"schema_version\": 0") + ) + assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) + verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 0) + wipeAllODFEIndices() + + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id) + assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) + verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 5) + verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 1) } fun `test alert index gets recreated automatically if deleted`() { @@ -58,10 +97,10 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(trueMonitor) assertIndexExists(AlertIndices.ALERT_INDEX) - assertIndexExists(AlertIndices.HISTORY_WRITE_INDEX) + assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) wipeAllODFEIndices() assertIndexDoesNotExist(AlertIndices.ALERT_INDEX) - assertIndexDoesNotExist(AlertIndices.HISTORY_WRITE_INDEX) + assertIndexDoesNotExist(AlertIndices.ALERT_HISTORY_WRITE_INDEX) val executeResponse = executeMonitor(trueMonitor) val xcp = createParser(XContentType.JSON.xContent(), executeResponse.entity.content) @@ -69,7 +108,28 @@ class AlertIndicesIT : AlertingRestTestCase() { assertNull("Error running a monitor after wiping alert indices", output["error"]) } - fun `test rollover history index`() { + fun `test finding index gets recreated automatically if deleted`() { + wipeAllODFEIndices() + assertIndexDoesNotExist(AlertIndices.FINDING_HISTORY_WRITE_INDEX) + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + + executeMonitor(trueMonitor.id) + assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) + wipeAllODFEIndices() + assertIndexDoesNotExist(AlertIndices.FINDING_HISTORY_WRITE_INDEX) + + createTestIndex(testIndex) + val executeResponse = executeMonitor(trueMonitor) + val xcp = createParser(XContentType.JSON.xContent(), executeResponse.entity.content) + val output = xcp.map() + assertNull("Error running a monitor after wiping finding indices", output["error"]) + } + + fun `test rollover alert history index`() { // Update the rollover check to be every 1 second and the index max age to be 1 second client().updateSettings(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.key, "1s") client().updateSettings(AlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE.key, "1s") @@ -82,7 +142,24 @@ class AlertIndicesIT : AlertingRestTestCase() { assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3) } - fun `test history disabled`() { + fun `test rollover finding history index`() { + // Update the rollover check to be every 1 second and the index max age to be 1 second + client().updateSettings(AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD.key, "1s") + client().updateSettings(AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE.key, "1s") + + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + executeMonitor(trueMonitor.id) + + // Allow for a rollover index. + Thread.sleep(2000) + assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2) + } + + fun `test alert history disabled`() { resetHistorySettings() val trigger1 = randomQueryLevelTrigger(condition = ALWAYS_RUN) @@ -97,10 +174,10 @@ class AlertIndicesIT : AlertingRestTestCase() { updateMonitor(monitor1.copy(triggers = listOf(trigger1.copy(condition = NEVER_RUN)), id = monitor1.id), true) executeMonitor(monitor1.id) - val completedAlert1 = searchAlerts(monitor1, AlertIndices.ALL_INDEX_PATTERN).single() + val completedAlert1 = searchAlerts(monitor1, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() assertNotNull("Alert is not completed", completedAlert1.endTime) - assertEquals(1, getHistoryDocCount()) + assertEquals(1, getAlertHistoryDocCount()) // Disable alert history client().updateSettings(AlertingSettings.ALERT_HISTORY_ENABLED.key, "false") @@ -119,11 +196,11 @@ class AlertIndicesIT : AlertingRestTestCase() { // For the second alert, since history is now disabled, searching for the completed alert should return an empty List // since a COMPLETED alert will be removed from the alert index and not added to the history index - val completedAlert2 = searchAlerts(monitor2, AlertIndices.ALL_INDEX_PATTERN) + val completedAlert2 = searchAlerts(monitor2, AlertIndices.ALL_ALERT_INDEX_PATTERN) assertTrue("Alert is not completed", completedAlert2.isEmpty()) // Get history entry count again and ensure the new alert was not added - assertEquals(1, getHistoryDocCount()) + assertEquals(1, getAlertHistoryDocCount()) } fun `test short retention period`() { @@ -139,18 +216,67 @@ class AlertIndicesIT : AlertingRestTestCase() { assertEquals("1 alert should be active", 1, activeAlert.size) assertEquals("Did not find 2 alert indices", 2, getAlertIndices().size) // History index is created but is empty - assertEquals(0, getHistoryDocCount()) + assertEquals(0, getAlertHistoryDocCount()) // Mark alert as COMPLETED updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id), true) executeMonitor(monitor.id) // Verify alert is completed - val completedAlert = searchAlerts(monitor, AlertIndices.ALL_INDEX_PATTERN).single() + val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() assertNotNull("Alert is not completed", completedAlert.endTime) // The completed alert should be removed from the active alert index and added to the history index - assertEquals(1, getHistoryDocCount()) + assertEquals(1, getAlertHistoryDocCount()) + + // Update rollover check and max docs as well as decreasing the retention period + client().updateSettings(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.key, "1s") + client().updateSettings(AlertingSettings.ALERT_HISTORY_MAX_DOCS.key, 1) + client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "1s") + + // Give some time for history to be rolled over and cleared + Thread.sleep(5000) + + // Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted. + // This leaves two indices: alert index and an empty history write index + assertEquals("Did not find 2 alert indices", 2, getAlertIndices().size) + assertEquals(0, getAlertHistoryDocCount()) + } + + fun `test short finding retention period`() { + resetHistorySettings() + + // Create monitor and execute + val testIndex = createTestIndex() + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_field" : "us-west-2" + }""" + indexDoc(testIndex, "1", testDoc) + + executeMonitor(monitor.id) + + // Check if alert is active and alert index is created + val activeAlert = searchAlerts(monitor) + assertEquals("1 alert should be active", 1, activeAlert.size) + assertEquals("Did not find 2 alert indices", 2, getAlertIndices().size) + // History index is created but is empty + assertEquals(0, getAlertHistoryDocCount()) + + // Mark doc level alert as Acknowledged + acknowledgeAlerts(monitor, activeAlert[0]) + + // Verify alert is completed + val ackAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() + assertNotNull("Alert is not acknowledged", ackAlert.acknowledgedTime) + + // The completed alert should be removed from the active alert index and added to the history index + assertEquals(1, getAlertHistoryDocCount()) // Update rollover check and max docs as well as decreasing the retention period client().updateSettings(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.key, "1s") @@ -163,7 +289,7 @@ class AlertIndicesIT : AlertingRestTestCase() { // Given the max_docs and retention settings above, the history index will rollover and the non-write index will be deleted. // This leaves two indices: alert index and an empty history write index assertEquals("Did not find 2 alert indices", 2, getAlertIndices().size) - assertEquals(0, getHistoryDocCount()) + assertEquals(0, getAlertHistoryDocCount()) } private fun assertIndexExists(index: String) { @@ -180,10 +306,23 @@ class AlertIndicesIT : AlertingRestTestCase() { client().updateSettings(AlertingSettings.ALERT_HISTORY_ENABLED.key, "true") client().updateSettings(AlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD.key, "60s") client().updateSettings(AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD.key, "60s") + client().updateSettings(AlertingSettings.FINDING_HISTORY_ENABLED.key, "true") + client().updateSettings(AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD.key, "60s") + client().updateSettings(AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD.key, "60s") } private fun getAlertIndices(): List { - val response = client().makeRequest("GET", "/_cat/indices/${AlertIndices.ALL_INDEX_PATTERN}?format=json") + val response = client().makeRequest("GET", "/_cat/indices/${AlertIndices.ALL_ALERT_INDEX_PATTERN}?format=json") + val xcp = createParser(XContentType.JSON.xContent(), response.entity.content) + val responseList = xcp.list() + val indices = mutableListOf() + responseList.filterIsInstance>().forEach { indices.add(it["index"] as String) } + + return indices + } + + private fun getFindingIndices(): List { + val response = client().makeRequest("GET", "/_cat/indices/${AlertIndices.ALL_FINDING_INDEX_PATTERN}?format=json") val xcp = createParser(XContentType.JSON.xContent(), response.entity.content) val responseList = xcp.list() val indices = mutableListOf() @@ -192,7 +331,7 @@ class AlertIndicesIT : AlertingRestTestCase() { return indices } - private fun getHistoryDocCount(): Long { + private fun getAlertHistoryDocCount(): Long { val request = """ { "query": { @@ -201,10 +340,10 @@ class AlertIndicesIT : AlertingRestTestCase() { } """.trimIndent() val response = adminClient().makeRequest( - "POST", "${AlertIndices.HISTORY_ALL}/_search", emptyMap(), + "POST", "${AlertIndices.ALERT_HISTORY_ALL}/_search", emptyMap(), StringEntity(request, APPLICATION_JSON) ) - assertEquals("Request to get history failed", RestStatus.OK, response.restStatus()) + assertEquals("Request to get alert history failed", RestStatus.OK, response.restStatus()) return SearchResponse.fromXContent(createParser(jsonXContent, response.entity.content)).hits.totalHits!!.value } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt index ca0169ee9..5078beb2d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/FindingTests.kt @@ -18,7 +18,11 @@ class FindingTests : OpenSearchTestCase() { // THEN assertEquals("Template args 'id' field does not match:", templateArgs[Finding.FINDING_ID_FIELD], finding.id) - assertEquals("Template args 'logEvent' field does not match:", templateArgs[Finding.RELATED_DOC_ID_FIELD], finding.relatedDocId) + assertEquals( + "Template args 'relatedDocIds' field does not match:", + templateArgs[Finding.RELATED_DOC_IDS_FIELD], + finding.relatedDocIds + ) assertEquals("Template args 'monitorId' field does not match:", templateArgs[Finding.MONITOR_ID_FIELD], finding.monitorId) assertEquals( "Template args 'monitorName' field does not match:", diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt index 5ca66589b..b48235330 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt @@ -15,7 +15,7 @@ class FindingsRestApiIT : AlertingRestTestCase() { fun `test find Finding where doc is not retrieved`() { - createFinding(matchingDocIds = setOf("someId")) + createFinding(matchingDocIds = listOf("someId")) val response = searchFindings() assertEquals(1, response.totalFindings) assertEquals(1, response.findings[0].documents.size) @@ -35,8 +35,8 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId2", testDoc2) - val findingWith1 = createFinding(matchingDocIds = setOf("someId"), index = testIndex) - val findingWith2 = createFinding(matchingDocIds = setOf("someId", "someId2"), index = testIndex) + val findingWith1 = createFinding(matchingDocIds = listOf("someId"), index = testIndex) + val findingWith2 = createFinding(matchingDocIds = listOf("someId", "someId2"), index = testIndex) val response = searchFindings() assertEquals(2, response.totalFindings) for (findingWithDoc in response.findings) { @@ -69,8 +69,8 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId2", testDoc2) - createFinding(matchingDocIds = setOf("someId"), index = testIndex) - val findingId = createFinding(matchingDocIds = setOf("someId", "someId2"), index = testIndex) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) + val findingId = createFinding(matchingDocIds = listOf("someId", "someId2"), index = testIndex) val response = searchFindings(mapOf(Pair("findingId", findingId))) assertEquals(1, response.totalFindings) assertEquals(findingId, response.findings[0].finding.id) @@ -95,9 +95,9 @@ class FindingsRestApiIT : AlertingRestTestCase() { indexDoc(testIndex, "someId2", testDoc2) val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) - createFinding(matchingDocIds = setOf("someId"), index = testIndex) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( - matchingDocIds = setOf("someId", "someId2"), + matchingDocIds = listOf("someId", "someId2"), index = testIndex, docLevelQueries = listOf(docLevelQuery) ) @@ -125,9 +125,9 @@ class FindingsRestApiIT : AlertingRestTestCase() { indexDoc(testIndex, "someId2", testDoc2) val docLevelQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "realQuery", tags = listOf("sigma")) - createFinding(matchingDocIds = setOf("someId"), index = testIndex) + createFinding(matchingDocIds = listOf("someId"), index = testIndex) val findingId = createFinding( - matchingDocIds = setOf("someId", "someId2"), + matchingDocIds = listOf("someId", "someId2"), index = testIndex, docLevelQueries = listOf(docLevelQuery) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index ace1fb9bf..6e00302c2 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -35,8 +35,8 @@ import org.opensearch.alerting.randomAlert import org.opensearch.alerting.randomAnomalyDetector import org.opensearch.alerting.randomAnomalyDetectorWithUser import org.opensearch.alerting.randomBucketLevelTrigger -import org.opensearch.alerting.randomDocumentReturningMonitor -import org.opensearch.alerting.randomDocumentReturningTrigger +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger import org.opensearch.alerting.randomQueryLevelMonitor import org.opensearch.alerting.randomQueryLevelTrigger import org.opensearch.alerting.randomThrottle @@ -798,7 +798,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) - val historyAlerts = searchAlerts(monitor, AlertIndices.HISTORY_WRITE_INDEX) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) assertEquals("Alert was not moved to history", 1, historyAlerts.size) assertEquals( "Alert data incorrect", @@ -827,7 +827,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) - val historyAlerts = searchAlerts(monitor, AlertIndices.HISTORY_WRITE_INDEX) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) assertEquals("Alert was not moved to history", 1, historyAlerts.size) assertEquals( "Alert data incorrect", @@ -860,7 +860,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("One alert should be in active index", 1, alerts.size) assertEquals("Wrong alert in active index", alertKeep.toJsonString(), alerts.single().toJsonString()) - val historyAlerts = searchAlerts(monitor, AlertIndices.HISTORY_WRITE_INDEX) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) // Only alertDelete should of been moved to history index assertEquals("One alert should be in history index", 1, historyAlerts.size) assertEquals( @@ -1120,8 +1120,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) - val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) - val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) val createResponse = client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) @@ -1141,9 +1141,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) - val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) val monitor = createMonitor( - randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger), user = null) + randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger), user = null) ) val storedMonitor = getMonitor(monitor.id) @@ -1157,8 +1157,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) - val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) - val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) val updatedTriggers = listOf( DocumentLevelTrigger( @@ -1188,8 +1188,8 @@ class MonitorRestApiIT : AlertingRestTestCase() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docReturningInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) - val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) - val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) val deleteResponse = client().makeRequest("DELETE", monitor.relativeUrl()) assertEquals("Delete failed", RestStatus.OK, deleteResponse.restStatus()) @@ -1201,7 +1201,7 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test creating a document monitor with error trigger`() { val trigger = randomQueryLevelTrigger() try { - val monitor = randomDocumentReturningMonitor(triggers = listOf(trigger)) + val monitor = randomDocumentLevelMonitor(triggers = listOf(trigger)) client().makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) fail("Monitor with illegal trigger should be rejected.") } catch (e: IllegalArgumentException) {