diff --git a/alerting/build.gradle b/alerting/build.gradle index c41754a5e..2e1eaba6e 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -92,6 +92,7 @@ dependencies { testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" testImplementation "org.mockito:mockito-core:4.7.0" + testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}" } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 676fdf685..1ed622c30 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -39,6 +39,7 @@ import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.Trigger import org.opensearch.commons.alerting.model.action.AlertCategory @@ -277,7 +278,13 @@ class AlertService( } ?: listOf() } - suspend fun saveAlerts(alerts: List, retryPolicy: BackoffPolicy, allowUpdatingAcknowledgedAlert: Boolean = false) { + suspend fun saveAlerts( + dataSources: DataSources, + alerts: List, + retryPolicy: BackoffPolicy, + allowUpdatingAcknowledgedAlert: Boolean = false + ) { + val alertIndex = dataSources.alertsIndex var requestsToRetry = alerts.flatMap { alert -> // We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts. // In the rare event that a user acknowledges an alert between when it's read and when it's written @@ -286,7 +293,7 @@ class AlertService( when (alert.state) { Alert.State.ACTIVE, Alert.State.ERROR -> { listOf>( - IndexRequest(AlertIndices.ALERT_INDEX) + IndexRequest(alertIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) @@ -297,7 +304,7 @@ class AlertService( // and updated by the MonitorRunner if (allowUpdatingAcknowledgedAlert) { listOf>( - IndexRequest(AlertIndices.ALERT_INDEX) + IndexRequest(alertIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) .id(if (alert.id != Alert.NO_ID) alert.id else null) @@ -311,10 +318,10 @@ class AlertService( } Alert.State.COMPLETED -> { listOfNotNull>( - DeleteRequest(AlertIndices.ALERT_INDEX, alert.id) + DeleteRequest(alertIndex, alert.id) .routing(alert.monitorId), // Only add completed alert to history index if history is enabled - if (alertIndices.isAlertHistoryEnabled()) { + if (alertIndices.isAlertHistoryEnabled(dataSources)) { IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) @@ -349,7 +356,7 @@ class AlertService( * The Alerts are required with their indexed ID so that when the new Alerts are updated after the Action execution, * the ID is available for the index request so that the existing Alert can be updated, instead of creating a duplicate Alert document. */ - suspend fun saveNewAlerts(alerts: List, retryPolicy: BackoffPolicy): List { + suspend fun saveNewAlerts(dataSources: DataSources, alerts: List, retryPolicy: BackoffPolicy): List { val savedAlerts = mutableListOf() var alertsBeingIndexed = alerts var requestsToRetry: MutableList = alerts.map { alert -> @@ -359,7 +366,7 @@ class AlertService( if (alert.id != Alert.NO_ID) { throw IllegalStateException("Unexpected attempt to save new alert [$alert] with an existing alert ID [${alert.id}]") } - IndexRequest(AlertIndices.ALERT_INDEX) + IndexRequest(dataSources.alertsIndex) .routing(alert.monitorId) .source(alert.toXContentWithUser(XContentFactory.jsonBuilder())) }.toMutableList() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 6d73928ad..46d388bed 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -44,8 +44,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) val currentAlerts = try { - monitorCtx.alertIndices!!.createOrUpdateAlertIndex() - monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts @@ -135,8 +135,10 @@ object BucketLevelMonitorRunner : MonitorRunner() { * will still execute with the Alert information in the ctx but the Alerts may not be visible. */ if (!dryrun && monitor.id != Monitor.NO_ID) { - monitorCtx.alertService!!.saveAlerts(dedupedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = true) - newAlerts = monitorCtx.alertService!!.saveNewAlerts(newAlerts, monitorCtx.retryPolicy!!) + monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, dedupedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = true + ) + newAlerts = monitorCtx.alertService!!.saveNewAlerts(monitor.dataSources, newAlerts, monitorCtx.retryPolicy!!) } // Store deduped and new Alerts to accumulate across pages @@ -269,9 +271,12 @@ object BucketLevelMonitorRunner : MonitorRunner() { // Update Alerts with action execution results (if it's not a test Monitor). // ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them. if (!dryrun && monitor.id != Monitor.NO_ID) { - monitorCtx.alertService!!.saveAlerts(updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false) + monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false + ) // Save any COMPLETED Alerts that were not covered in updatedAlerts monitorCtx.alertService!!.saveAlerts( + monitor.dataSources, completedAlertsToUpdate.toList(), monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 5ca511625..ff04b3417 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -14,7 +14,6 @@ import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest -import org.opensearch.alerting.alerts.AlertIndices.Companion.FINDING_HISTORY_WRITE_INDEX import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata @@ -41,7 +40,6 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor -import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.util.string import org.opensearch.index.query.BoolQueryBuilder @@ -71,9 +69,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) try { - monitorCtx.alertIndices!!.createOrUpdateAlertIndex() - monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() - monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex() + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) + monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources) } catch (e: Exception) { val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id logger.error("Error setting up alerts and findings indices for monitor: $id", e) @@ -87,7 +85,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return monitorResult.copy(error = AlertingException.wrap(e)) } - monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex() + monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( monitor = monitor, monitorId = monitor.id, @@ -291,7 +289,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { alert.copy(actionExecutionResults = actionExecutionResults) } - monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(updatedAlerts, it) } + monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) } } return triggerResult } @@ -320,7 +318,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Findings: $findingStr") if (shouldCreateFinding) { - val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX) + val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) .source(findingStr, XContentType.JSON) .id(finding.id) @@ -507,7 +505,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } boolQueryBuilder.filter(percolateQueryBuilder) - val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + val queryIndex = monitor.dataSources.queryIndex + val searchRequest = SearchRequest(queryIndex) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) @@ -517,7 +516,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search percolate index: ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + throw IOException("Failed to search percolate index: $queryIndex") } return response.hits } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index f44594785..8ae60f568 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -36,8 +36,8 @@ object QueryLevelMonitorRunner : MonitorRunner() { var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) val currentAlerts = try { - monitorCtx.alertIndices!!.createOrUpdateAlertIndex() - monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex() + monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) + monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor) } catch (e: Exception) { // We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts @@ -81,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { // Don't save alerts if this is a test monitor if (!dryrun && monitor.id != Monitor.NO_ID) { - monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(updatedAlerts, it) } + monitorCtx.retryPolicy?.let { monitorCtx.alertService!!.saveAlerts(monitor.dataSources, updatedAlerts, it) } } return monitorResult.copy(triggerResults = triggerResults) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index cf0c57859..e813e8d0d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -45,6 +45,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.threadpool.Scheduler.Cancellable import org.opensearch.threadpool.ThreadPool import java.time.Instant @@ -230,7 +231,12 @@ class AlertIndices( return alertIndexInitialized && alertHistoryIndexInitialized } - fun isAlertHistoryEnabled(): Boolean = alertHistoryEnabled + fun isAlertHistoryEnabled(dataSources: DataSources): Boolean { + if (dataSources.alertsIndex == ALERT_INDEX) { + return alertHistoryEnabled + } + return false + } fun isFindingHistoryEnabled(): Boolean = findingHistoryEnabled @@ -243,7 +249,23 @@ class AlertIndices( } alertIndexInitialized } + suspend fun createOrUpdateAlertIndex(dataSources: DataSources) { + if (dataSources.alertsIndex == ALERT_INDEX) { + return createOrUpdateAlertIndex() + } + val alertsIndex = dataSources.alertsIndex + if (!clusterService.state().routingTable().hasIndex(alertsIndex)) { + alertIndexInitialized = createIndex(alertsIndex!!, alertMapping()) + } else { + updateIndexMapping(alertsIndex!!, alertMapping()) + } + } + suspend fun createOrUpdateInitialAlertHistoryIndex(dataSources: DataSources) { + if (dataSources.alertsIndex == ALERT_INDEX) { + return createOrUpdateInitialAlertHistoryIndex() + } + } suspend fun createOrUpdateInitialAlertHistoryIndex() { if (!alertHistoryIndexInitialized) { alertHistoryIndexInitialized = createIndex(ALERT_HISTORY_INDEX_PATTERN, alertMapping(), ALERT_HISTORY_WRITE_INDEX) @@ -273,6 +295,21 @@ class AlertIndices( findingHistoryIndexInitialized } + suspend fun createOrUpdateInitialFindingHistoryIndex(dataSources: DataSources) { + if (dataSources.findingsIndex == FINDING_HISTORY_WRITE_INDEX) { + return createOrUpdateInitialFindingHistoryIndex() + } + val findingsIndex = dataSources.findingsIndex + if (!clusterService.state().routingTable().hasIndex(findingsIndex)) { + createIndex( + findingsIndex, + findingMapping() + ) + } else { + updateIndexMapping(findingsIndex, findingMapping(), false) + } + } + private suspend fun createIndex(index: String, schemaMapping: String, alias: String? = null): Boolean { // This should be a fast check of local cluster state. Should be exceedingly rare that the local cluster // state does not contain the index and multiple nodes concurrently try to create the index. @@ -306,7 +343,7 @@ class AlertIndices( return } - var putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex) + val putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex) .source(mapping, XContentType.JSON) val updateResponse: AcknowledgedResponse = client.admin().indices().suspendUntil { putMapping(putMappingRequest, it) } if (updateResponse.isAcknowledged) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt index 0e7705ccb..7925dff0a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting.resthandler import org.apache.logging.log4j.LogManager import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.util.IF_PRIMARY_TERM import org.opensearch.alerting.util.IF_SEQ_NO import org.opensearch.alerting.util.REFRESH @@ -21,6 +22,7 @@ import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger +import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.BaseRestHandler import org.opensearch.rest.BaseRestHandler.RestChannelConsumer @@ -82,6 +84,7 @@ class RestIndexMonitorAction : BaseRestHandler() { val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + validateDataSources(monitor) val monitorType = monitor.monitorType val triggers = monitor.triggers when (monitorType) { @@ -121,6 +124,18 @@ class RestIndexMonitorAction : BaseRestHandler() { } } + private fun validateDataSources(monitor: Monitor) { // Data Sources will currently be supported only at transport layer. + if (monitor.dataSources != null) { + if ( + monitor.dataSources.queryIndex != ScheduledJob.DOC_LEVEL_QUERIES_INDEX || + monitor.dataSources.findingsIndex != AlertIndices.FINDING_HISTORY_WRITE_INDEX || + monitor.dataSources.alertsIndex != AlertIndices.ALERT_INDEX + ) { + throw IllegalArgumentException("Custom Data Sources are not allowed.") + } + } + } + private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener { return object : RestResponseListener(channel) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 8bb246724..dd804e980 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -126,7 +126,7 @@ class TransportExecuteMonitorAction @Inject constructor( try { scope.launch { if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { - docLevelMonitorQueries.initDocLevelQueryIndex() + docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") } docLevelMonitorQueries.indexDocLevelQueries( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index ba33ff2aa..bfcad6d5a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -447,9 +447,10 @@ class TransportIndexMonitorAction @Inject constructor( @Suppress("UNCHECKED_CAST") private suspend fun indexDocLevelMonitorQueries(monitor: Monitor, monitorId: String, refreshPolicy: RefreshPolicy) { - if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { - docLevelMonitorQueries.initDocLevelQueryIndex() - log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + val queryIndex = monitor.dataSources.queryIndex + if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) { + docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) + log.info("Central Percolation index $queryIndex created") } docLevelMonitorQueries.indexDocLevelQueries( monitor, @@ -457,7 +458,7 @@ class TransportIndexMonitorAction @Inject constructor( refreshPolicy, indexTimeout ) - log.debug("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + log.debug("Queries inserted into Percolate index $queryIndex") } private suspend fun updateMonitor() { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 9b35f39b8..98693def1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -22,6 +22,7 @@ import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor @@ -58,6 +59,36 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } return true } + suspend fun initDocLevelQueryIndex(dataSources: DataSources): Boolean { + if (dataSources.queryIndex == ScheduledJob.DOC_LEVEL_QUERIES_INDEX) { + return initDocLevelQueryIndex() + } + val queryIndex = dataSources.queryIndex + if (!clusterService.state().routingTable.hasIndex(queryIndex)) { + val indexRequest = CreateIndexRequest(queryIndex) + .mapping(docLevelQueriesMappings()) + .settings( + Settings.builder().put("index.hidden", true) + .build() + ) + return try { + val createIndexResponse: CreateIndexResponse = client.suspendUntil { client.admin().indices().create(indexRequest, it) } + createIndexResponse.isAcknowledged + } catch (t: ResourceAlreadyExistsException) { + if (t.message?.contains("already exists") == true) { + true + } else { + throw t + } + } + } + return true + } + + fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { + val clusterState = clusterService.state() + return clusterState.routingTable.hasIndex(dataSources.queryIndex) + } fun docLevelQueryIndexExists(): Boolean { val clusterState = clusterService.state() @@ -92,16 +123,21 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) val updatedProperties = properties.entries.associate { - if (it.value.containsKey("path")) { - val newVal = it.value.toMutableMap() - newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId" - "${it.key}_${indexName}_$monitorId" to newVal - } else { - "${it.key}_${indexName}_$monitorId" to it.value + val newVal = it.value.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (it.value.containsKey("type") && mappingsByType.containsKey(it.value["type"]!!)) { + mappingsByType[it.value["type"]]?.entries?.forEach { iter: Map.Entry -> + newVal[iter.key] = iter.value + } + } } + if (it.value.containsKey("path")) newVal["path"] = "${it.value["path"]}_${indexName}_$monitorId" + "${it.key}_${indexName}_$monitorId" to newVal } + val queryIndex = monitor.dataSources.queryIndex - val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + val updateMappingRequest = PutMappingRequest(queryIndex) updateMappingRequest.source(mapOf("properties" to updatedProperties)) val updateMappingResponse: AcknowledgedResponse = client.suspendUntil { client.admin().indices().putMapping(updateMappingRequest, it) @@ -114,7 +150,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ properties.forEach { prop -> query = query.replace("${prop.key}:", "${prop.key}_${indexName}_$monitorId:") } - val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + val indexRequest = IndexRequest(queryIndex) .id(it.id + "_${indexName}_$monitorId") .source( mapOf( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index aa2fdc39d..6fdba9863 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -76,6 +76,9 @@ import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL import kotlin.collections.HashMap +/** + * Superclass for tests that interact with an external test cluster using OpenSearch's RestClient + */ abstract class AlertingRestTestCase : ODFERestTestCase() { protected val isDebuggingTest = DisableOnDebug(null).isDebugging diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 96da7a8f7..b53398b5b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -9,6 +9,7 @@ import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PAT import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN import org.opensearch.client.Response import org.opensearch.client.ResponseException +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy @@ -579,6 +580,50 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } } + fun `test execute monitor with non-null data sources`() { + + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val alertCategories = AlertCategory.values() + val actionExecutionScope = PerAlertActionScope( + actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet() + ) + val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope) + val actions = (0..randomInt(10)).map { + randomActionWithPolicy( + template = randomTemplateScript("Hello {{ctx.monitor.name}}"), + destinationId = createDestination().id, + actionExecutionPolicy = actionExecutionPolicy + ) + } + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions) + try { + createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + findingsIndex = "custom_findings_index", + alertsIndex = "custom_alerts_index", + ) + ) + ) + fail("Expected create monitor to fail") + } catch (e: ResponseException) { + assertTrue(e.message!!.contains("illegal_argument_exception")) + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt new file mode 100644 index 000000000..4617462d6 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -0,0 +1,273 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.junit.Assert +import org.opensearch.action.admin.cluster.state.ClusterStateRequest +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.transport.AlertingSingleNodeTestCase +import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit.MILLIS + +class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { + + fun `test execute monitor with dryrun`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, true) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + } + + fun `test execute monitor with custom alerts index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(alertsIndex = customAlertsIndex) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + val alerts = searchAlerts(id, customAlertsIndex) + assertEquals("Alert saved for test monitor", 1, alerts.size) + } + + fun `test execute monitor with custom query index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(queryIndex = customQueryIndex) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + } + + fun `test execute monitor with custom query index and custom field mappings`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customQueryIndex = "custom_alerts_index" + val analyzer = "whitespace" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get() + val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping() + Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true) + } + + fun `test execute monitor with custom findings index`() { + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources(findingsIndex = customFindingsIndex) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + indexDoc(index, "1", testDoc) + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + } + + fun `test execute pre-existing monitorand update`() { + val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) + .settings(Settings.builder().put("index.hidden", true).build()) + client().admin().indices().create(request) + val monitorStringWithoutName = """ + { + "monitor": { + "type": "monitor", + "schema_version": 0, + "name": "UayEuXpZtb", + "monitor_type": "doc_level_monitor", + "user": { + "name": "", + "backend_roles": [], + "roles": [], + "custom_attribute_names": [], + "user_requested_tenant": null + }, + "enabled": true, + "enabled_time": 1662753436791, + "schedule": { + "period": { + "interval": 5, + "unit": "MINUTES" + } + }, + "inputs": [{ + "doc_level_input": { + "description": "description", + "indices": [ + "$index" + ], + "queries": [{ + "id": "63efdcce-b5a1-49f4-a25f-6b5f9496a755", + "name": "3", + "query": "test_field:\"us-west-2\"", + "tags": [] + }] + } + }], + "triggers": [{ + "document_level_trigger": { + "id": "OGnTI4MBv6qt0ATc9Phk", + "name": "mrbHRMevYI", + "severity": "1", + "condition": { + "script": { + "source": "return true", + "lang": "painless" + } + }, + "actions": [] + } + }], + "last_update_time": 1662753436791 + } + } + """.trimIndent() + val monitorId = "abc" + indexDoc(SCHEDULED_JOBS_INDEX, monitorId, monitorStringWithoutName) + val getMonitorResponse = getMonitorResponse(monitorId) + Assert.assertNotNull(getMonitorResponse) + Assert.assertNotNull(getMonitorResponse.monitor) + val monitor = getMonitorResponse.monitor + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(index, "1", testDoc) + var executeMonitorResponse = executeMonitor(monitor!!, monitorId, false) + Assert.assertNotNull(executeMonitorResponse) + if (executeMonitorResponse != null) { + Assert.assertNotNull(executeMonitorResponse.monitorRunResult.monitorName) + } + val alerts = searchAlerts(monitorId) + assertEquals(alerts.size, 1) + + val customAlertsIndex = "custom_alerts_index" + val customQueryIndex = "custom_query_index" + val customFindingsIndex = "custom_findings_index" + val updateMonitorResponse = updateMonitor( + monitor.copy( + id = monitorId, + dataSources = DataSources( + alertsIndex = customAlertsIndex, + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex + ) + ), + monitorId + ) + Assert.assertNotNull(updateMonitorResponse) + indexDoc(index, "2", testDoc) + if (updateMonitorResponse != null) { + executeMonitorResponse = executeMonitor(updateMonitorResponse.monitor, monitorId, false) + } + val findings = searchFindings(monitorId, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2")) + val customAlertsIndexAlerts = searchAlerts(monitorId, customAlertsIndex) + assertEquals("Alert saved for test monitor", 1, customAlertsIndexAlerts.size) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 60a4a87a5..a698812a7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -41,6 +41,7 @@ import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelec import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter import org.opensearch.commons.alerting.model.BucketLevelTrigger import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -170,6 +171,25 @@ fun randomDocumentLevelMonitor( ) } +fun randomDocumentLevelMonitor( + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + inputs: List = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + triggers: List = (1..randomInt(10)).map { randomQueryLevelTrigger() }, + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + withMetadata: Boolean = false, + dataSources: DataSources +): Monitor { + return Monitor( + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources + ) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt new file mode 100644 index 000000000..5153ea1d6 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -0,0 +1,157 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import org.opensearch.action.admin.indices.refresh.RefreshAction +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.action.ExecuteMonitorAction +import org.opensearch.alerting.action.ExecuteMonitorRequest +import org.opensearch.alerting.action.ExecuteMonitorResponse +import org.opensearch.alerting.action.GetMonitorAction +import org.opensearch.alerting.action.GetMonitorRequest +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.model.Finding +import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentType +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexMonitorRequest +import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.index.query.TermQueryBuilder +import org.opensearch.index.reindex.ReindexPlugin +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.plugins.Plugin +import org.opensearch.rest.RestRequest +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext +import org.opensearch.test.OpenSearchSingleNodeTestCase +import java.time.Instant +import java.util.* + +/** + * A test that keep a singleton node started for all tests that can be used to get + * references to Guice injectors in unit tests. + */ +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { + + protected val index: String = randomAlphaOfLength(10).lowercase(Locale.ROOT) + + override fun setUp() { + super.setUp() + createTestIndex() + } + + protected fun executeMonitor(monitor: Monitor, id: String, dryRun: Boolean = true): ExecuteMonitorResponse? { + val request = ExecuteMonitorRequest(dryRun, TimeValue(Instant.now().toEpochMilli()), id, monitor) + return client().execute(ExecuteMonitorAction.INSTANCE, request).get() + } + + /** A test index that can be used across tests. Feel free to add new fields but don't remove any. */ + protected fun createTestIndex() { + createIndex( + index, Settings.EMPTY, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" } + } + """.trimIndent() + ) + } + + protected fun indexDoc(index: String, id: String, doc: String) { + client().prepareIndex(index).setId(id) + .setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get() + } + + protected fun createMonitor(monitor: Monitor): IndexMonitorResponse? { + val request = IndexMonitorRequest( + monitorId = Monitor.NO_ID, + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + refreshPolicy = WriteRequest.RefreshPolicy.parse("true"), + method = RestRequest.Method.POST, + monitor = monitor + ) + return client().execute(AlertingActions.INDEX_MONITOR_ACTION_TYPE, request).actionGet() + } + + protected fun updateMonitor(monitor: Monitor, monitorId: String): IndexMonitorResponse? { + val request = IndexMonitorRequest( + monitorId = monitorId, + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + refreshPolicy = WriteRequest.RefreshPolicy.parse("true"), + method = RestRequest.Method.PUT, + monitor = monitor + ) + return client().execute(AlertingActions.INDEX_MONITOR_ACTION_TYPE, request).actionGet() + } + + protected fun searchAlerts(id: String, indices: String = AlertIndices.ALERT_INDEX, refresh: Boolean = true): List { + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return emptyList() + } + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder(Alert.MONITOR_ID_FIELD, id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + Alert.parse(xcp, it.id, it.version) + } + } + + protected fun refreshIndex(index: String) { + client().execute(RefreshAction.INSTANCE, RefreshRequest(index)).get() + } + + protected fun searchFindings( + id: String, + indices: String = AlertIndices.ALL_FINDING_INDEX_PATTERN, + refresh: Boolean = true + ): List { + if (refresh) refreshIndex(indices) + + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder(Alert.MONITOR_ID_FIELD, id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + Finding.parse(xcp) + }.filter { finding -> finding.monitorId == id } + } + + protected fun getMonitorResponse( + monitorId: String, + version: Long = 1L, + fetchSourceContext: FetchSourceContext = FetchSourceContext.FETCH_SOURCE + ) = client().execute( + GetMonitorAction.INSTANCE, + GetMonitorRequest(monitorId, version, RestRequest.Method.GET, fetchSourceContext) + ).get() + + override fun getPlugins(): List> { + return listOf(AlertingPlugin::class.java, ReindexPlugin::class.java) + } + + override fun resetNodeAfterTest(): Boolean { + return false + } +} diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 9771686ff..8254e1ebf 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -118,6 +118,46 @@ } } }, + "data_sources": { + "properties": { + "alerts_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "findings_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "query_index": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "query_index_mapping": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, "group_by_fields": { "type": "text", "fields": {