Skip to content

Commit

Permalink
create findingIndex bugfix (opensearch-project#653)
Browse files Browse the repository at this point in the history
* findingIndex create bugfix

Signed-off-by: Petar Dzepina <[email protected]>

* empty commit

Signed-off-by: Petar Dzepina <[email protected]>

* IT fix

Signed-off-by: Petar Dzepina <[email protected]>

Signed-off-by: Petar Dzepina <[email protected]>
(cherry picked from commit 1018fa2)
  • Loading branch information
petardz committed Nov 8, 2022
1 parent 6064dad commit 4167869
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class AlertIndices(
}
val findingsIndex = dataSources.findingsIndex
val findingsIndexPattern = dataSources.findingsIndexPattern ?: FINDING_HISTORY_INDEX_PATTERN
if (!clusterService.state().metadata().hasAlias(findingsIndexPattern)) {
if (!clusterService.state().metadata().hasAlias(findingsIndex)) {
createIndex(
findingsIndexPattern,
findingMapping(),
Expand Down
151 changes: 132 additions & 19 deletions alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,27 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
Expand All @@ -47,6 +55,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "alerting")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, true)
Expand Down Expand Up @@ -191,17 +200,67 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true)
}

fun `test execute monitor with custom findings index`() {
fun `test delete monitor deletes all queries and metadata too`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customQueryIndex = "custom_alerts_index"
val analyzer = "whitespace"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(
queryIndex = customQueryIndex,
queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))),
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(monitorId)
val clusterStateResponse = client().admin().cluster().state(ClusterStateRequest().indices(customQueryIndex).metadata(true)).get()
val mapping = clusterStateResponse.state.metadata.index(customQueryIndex).mapping()
Assert.assertTrue(mapping?.source()?.string()?.contains("\"analyzer\":\"$analyzer\"") == true)
// Verify queries exist
var searchResponse = client().search(
SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
).get()
assertNotEquals(0, searchResponse.hits.hits.size)
client().execute(
AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE)
).get()
client().admin().indices().refresh(RefreshRequest(customQueryIndex)).get()
// Verify queries are deleted
searchResponse = client().search(
SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))
).get()
assertEquals(0, searchResponse.hits.hits.size)
}

fun `test execute monitor with custom findings index and pattern`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customFindingsIndex = "custom_findings_index"
val customFindingsIndexPattern = "<custom_findings_index-{now/d}-1>"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(findingsIndex = customFindingsIndex)
dataSources = DataSources(findingsIndex = customFindingsIndex, findingsIndexPattern = customFindingsIndexPattern)
)
val monitorResponse = createMonitor(monitor)
client().admin().indices().refresh(RefreshRequest("*"))
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
Expand All @@ -212,24 +271,25 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
var executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
val findings = searchFindings(id, customFindingsIndex)

var findings = searchFindings(id, "custom_findings_index*", true)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1"))
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)

indexDoc(index, "2", testDoc)
executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
searchAlerts(id)
findings = searchFindings(id, "custom_findings_index*", true)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("2"))

val indices = getAllIndicesFromPattern("custom_findings_index*")
Assert.assertTrue(indices.isNotEmpty())
}

fun `test execute pre-existing monitorand update`() {
Expand Down Expand Up @@ -314,11 +374,11 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {

val customAlertsIndex = "custom_alerts_index"
val customQueryIndex = "custom_query_index"
Assert.assertFalse(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
val customFindingsIndex = "custom_findings_index"
val updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorId,
owner = "security_analytics_plugin",
dataSources = DataSources(
alertsIndex = customAlertsIndex,
queryIndex = customQueryIndex,
Expand All @@ -328,9 +388,11 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorId
)
Assert.assertNotNull(updateMonitorResponse)
Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin")
indexDoc(index, "2", testDoc)
executeMonitorResponse = executeMonitor(updateMonitorResponse!!.monitor, monitorId, false)
Assert.assertTrue(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
if (updateMonitorResponse != null) {
executeMonitorResponse = executeMonitor(updateMonitorResponse.monitor, monitorId, false)
}
val findings = searchFindings(monitorId, customFindingsIndex)
assertEquals("Findings saved for test monitor", 1, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("2"))
Expand All @@ -347,6 +409,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX)
var searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 0)
searchRequest.source().query(MatchQueryBuilder("monitor.owner", "security_analytics_plugin"))
searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1)
}

fun `test execute GetFindingsAction with monitorId param`() {
Expand Down Expand Up @@ -427,6 +499,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}

fun `test execute monitor with owner field`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(alertsIndex = customAlertsIndex),
owner = "owner"
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "owner")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
val alerts = searchAlerts(id, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, alerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute GetFindingsAction with unknown findingIndex param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting.transport

import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope
import org.opensearch.action.admin.indices.get.GetIndexRequestBuilder
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.support.WriteRequest
Expand Down Expand Up @@ -54,6 +56,15 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() {
createTestIndex()
}

protected fun getAllIndicesFromPattern(pattern: String): List<String> {
val getIndexResponse = (
client().admin().indices().prepareGetIndex()
.setIndices(pattern) as GetIndexRequestBuilder
).get() as GetIndexResponse
getIndexResponse
return getIndexResponse.indices().toList()
}

protected fun executeMonitor(monitor: Monitor, id: String, dryRun: Boolean = true): ExecuteMonitorResponse? {
val request = ExecuteMonitorRequest(dryRun, TimeValue(Instant.now().toEpochMilli()), id, monitor)
return client().execute(ExecuteMonitorAction.INSTANCE, request).get()
Expand Down

0 comments on commit 4167869

Please sign in to comment.