From 225974c55ee05b5aa52a7b3aab85086d6bde5ad0 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 1 Mar 2024 10:10:30 -0800 Subject: [PATCH] optimize sequence number calculation and reduce search requests in doc level monitor execution (#1445) * optimize sequence number calculation and reduce search requests by n where n is number of shards being queried in the executino Signed-off-by: Surya Sashank Nistala * fix tests Signed-off-by: Surya Sashank Nistala * optimize check indices and execute to query only write index of aliases and datastreams during monitor creation Signed-off-by: Surya Sashank Nistala * fix test Signed-off-by: Surya Sashank Nistala * add javadoc Signed-off-by: Surya Sashank Nistala * add tests to verify seq_no calculation Signed-off-by: Surya Sashank Nistala --------- Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 2 + .../alerting/DocumentLevelMonitorRunner.kt | 152 +++++++++--------- .../alerting/MonitorRunnerExecutionContext.kt | 2 + .../alerting/MonitorRunnerService.kt | 8 + .../model/DocumentExecutionContext.kt | 14 -- .../alerting/model/IndexExecutionContext.kt | 19 +++ .../alerting/settings/AlertingSettings.kt | 11 ++ .../transport/TransportIndexMonitorAction.kt | 10 +- .../alerting/DocumentMonitorRunnerIT.kt | 67 ++++++-- 9 files changed, 187 insertions(+), 98 deletions(-) delete mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 93a0cf677..bcb26bc17 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -46,6 +46,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings @@ -332,6 +333,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 22079de77..ff939418a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -17,8 +17,8 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult @@ -30,7 +30,6 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.workflow.WorkflowRunContext -import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.Preference @@ -59,6 +58,8 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indices.IndexClosedException import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits @@ -207,7 +208,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( + val indexUpdatedRunContext = initializeNewLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, concreteIndexName, @@ -255,25 +256,29 @@ class DocumentLevelMonitorRunner : MonitorRunner() { "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" ) } - - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - fetchShardDataAndMaybeExecutePercolateQueries( - monitor, - monitorCtx, - docExecutionContext, + val indexExecutionContext = IndexExecutionContext( + queries, + indexLastRunContext, + indexUpdatedRunContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), matchingDocIdsPerIndex?.get(concreteIndexName), + ) + + fetchShardDataAndMaybeExecutePercolateQueries( + monitor, + monitorCtx, + indexExecutionContext, monitorMetadata, inputRunResults, docsToQueries, updatedIndexNames, concreteIndicesSeenSoFar, ArrayList(fieldsToBeQueried) - ) + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo + } } } /* if all indices are covered still in-memory docs size limit is not breached we would need to submit @@ -615,7 +620,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) } - private suspend fun updateLastRunContext( + private fun initializeNewLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, index: String, @@ -624,8 +629,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) - updatedLastRunContext[shard] = maxSeqNo.toString() + updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString() } return updatedLastRunContext } @@ -657,33 +661,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return indexCreationDate > lastExecutionTime.toEpochMilli() } - /** - * Get the current max seq number of the shard. We find it by searching the last document - * in the primary shard. - */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { - val request: SearchRequest = SearchRequest() - .indices(index) - .preference("_shards:$shard") - .source( - SearchSourceBuilder() - .version(true) - .sort("_seq_no", SortOrder.DESC) - .seqNoAndPrimaryTerm(true) - .query(QueryBuilders.matchAllQuery()) - .size(1) - ) - val response: SearchResponse = client.suspendUntil { client.search(request, it) } - if (response.status() !== RestStatus.OK) { - throw IOException("Failed to get max seq no for shard: $shard") - } - nonPercolateSearchesTimeTakenStat += response.took.millis - if (response.hits.hits.isEmpty()) - return -1L - - return response.hits.hits[0].seqNo - } - private fun getShardsCount(clusterService: ClusterService, index: String): Int { val allShards: List = clusterService!!.state().routingTable().allShards(index) return allShards.filter { it.primary() }.size @@ -697,51 +674,79 @@ class DocumentLevelMonitorRunner : MonitorRunner() { private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docExecutionCtx: DocumentExecutionContext, - indexName: String, - concreteIndexName: String, - conflictingFields: List, - docIds: List? = null, + indexExecutionCtx: IndexExecutionContext, monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, monitorInputIndices: List, concreteIndices: List, fieldsToBeQueried: List, + updateLastRunContext: (String, String) -> Unit ) { - val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int + val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { val shard = i.toString() try { - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() - - val hits: SearchHits = searchShard( - monitorCtx, - concreteIndexName, - shard, - prevSeqNo, - maxSeqNo, - docIds, - fieldsToBeQueried - ) - val startTime = System.currentTimeMillis() - transformedDocs.addAll( - transformSearchHitsAndReconstructDocs( - hits, - indexName, - concreteIndexName, - monitor.id, - conflictingFields, + val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED + var to: Long = Long.MAX_VALUE + while (to >= from) { + val hits: SearchHits = searchShard( + monitorCtx, + indexExecutionCtx.concreteIndexName, + shard, + from, + to, + indexExecutionCtx.docIds, + fieldsToBeQueried, ) - ) - docTransformTimeTakenStat += System.currentTimeMillis() - startTime + if (hits.hits.isEmpty()) { + if (to == Long.MAX_VALUE) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs + } + break + } + if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed + updateLastRunContext(shard, hits.hits[0].seqNo.toString()) + } + val leastSeqNoFromHits = hits.hits.last().seqNo + to = leastSeqNoFromHits - 1 + val startTime = System.currentTimeMillis() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexExecutionCtx.indexName, + indexExecutionCtx.concreteIndexName, + monitor.id, + indexExecutionCtx.conflictingFields, + ) + ) + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries, + ) + } + docTransformTimeTakenStat += System.currentTimeMillis() - startTime + } } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + - " Failed to run fetch data from shard [$shard] of index [$concreteIndexName]. Error: ${e.message}", + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + + "Error: ${e.message}", e ) + if (e is IndexClosedException) { + throw e + } } if ( transformedDocs.isNotEmpty() && @@ -833,8 +838,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .source( SearchSourceBuilder() .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) .query(boolQueryBuilder) - .size(10000) + .size(monitorCtx.docLevelMonitorShardFetchSize) ) .preference(Preference.PRIMARY_FIRST.type()) @@ -846,7 +853,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - logger.error("Failed search shard. Response: $response") throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } nonPercolateSearchesTimeTakenStat += response.took.millis diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 043ae88d4..424656c6b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -55,4 +55,6 @@ data class MonitorRunnerExecutionContext( @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + @Volatile var docLevelMonitorShardFetchSize: Int = + AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 9cd3c2401..a8f0a5f41 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -26,6 +26,7 @@ import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT @@ -202,6 +203,13 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it } + monitorCtx.docLevelMonitorShardFetchSize = + DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) { + monitorCtx.docLevelMonitorShardFetchSize = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt deleted file mode 100644 index 0caad1f4a..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.model - -import org.opensearch.commons.alerting.model.DocLevelQuery - -data class DocumentExecutionContext( - val queries: List, - val lastRunContext: Map, - val updatedLastRunContext: Map -) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt new file mode 100644 index 000000000..e7aa707f9 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.DocLevelQuery + +/** DTO that contains all the necessary context for fetching data from shard and performing percolate queries */ +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, + val updatedLastRunContext: MutableMap, + val indexName: String, + val concreteIndexName: String, + val conflictingFields: List, + val docIds: List? = null, +) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 422c7c452..a0eda830a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -20,6 +20,7 @@ class AlertingSettings { const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 + const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -38,6 +39,16 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + /** Purely a setting used to verify seq_no calculation + */ + val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting( + "plugins.alerting.monitor.doc_level_monitor_shard_fetch_size", + DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, + 1, + 10000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 2100c0593..057977079 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -198,7 +198,15 @@ class TransportIndexMonitorAction @Inject constructor( else (it as DocLevelMonitorInput).indices indices.addAll(inputIndices) } - val searchRequest = SearchRequest().indices(*indices.toTypedArray()) + val updatedIndices = indices.map { index -> + if (IndexUtils.isAlias(index, clusterService.state()) || IndexUtils.isDataStream(index, clusterService.state())) { + val metadata = clusterService.state().metadata.indicesLookup[index]?.writeIndex + metadata?.index?.name ?: index + } else { + index + } + } + val searchRequest = SearchRequest().indices(*updatedIndices.toTypedArray()) .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) client.search( searchRequest, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 8c05db645..f73ec2222 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -137,6 +137,49 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Alert saved for test monitor", 0, alerts.size) } + fun `test seq_no calculation correctness when docs are deleted`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.key, 2) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + indexDoc(index, "5", testDoc) + indexDoc(index, "11", testDoc) + indexDoc(index, "21", testDoc) + indexDoc(index, "31", testDoc) + indexDoc(index, "41", testDoc) + indexDoc(index, "51", testDoc) + + deleteDoc(index, "51") + val response = executeMonitor(monitor, params = mapOf("dryrun" to "false")) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(9, triggerResult.objectMap("action_results").values.size) + } + } + fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() { val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) @@ -163,6 +206,10 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + indexDoc(index, "5", testDoc) val response = executeMonitor(monitor, params = DRYRUN_MONITOR) @@ -365,8 +412,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor2) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) // ensure query from second monitor was saved val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"") @@ -419,8 +466,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor with tag as trigger condition generates alerts and findings`() { @@ -459,8 +506,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor input error`() { @@ -558,8 +605,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor generates alerts and findings with per trigger execution for actions`() { @@ -621,8 +668,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor with wildcard index that generates alerts and findings for EQUALS query operator`() {