Skip to content

Commit

Permalink
adds filtering on owner field in search monitor action (#641) (#642)
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>

Signed-off-by: Surya Sashank Nistala <[email protected]>
(cherry picked from commit f80b3e0)

Co-authored-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] and eirsep authored Nov 4, 2022
1 parent eb693ad commit ca22ee3
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
validateDataSources(monitor)
validateOwner(monitor.owner)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Expand Down Expand Up @@ -136,6 +137,12 @@ class RestIndexMonitorAction : BaseRestHandler() {
}
}

private fun validateOwner(owner: String?) {
if (owner != "alerting") {
throw IllegalArgumentException("Invalid owner field")
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method):
RestResponseListener<IndexMonitorResponse> {
return object : RestResponseListener<IndexMonitorResponse>(channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
Expand Down Expand Up @@ -97,14 +95,6 @@ class RestSearchMonitorAction(
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser())
searchSourceBuilder.fetchSource(context(request))

val queryBuilder = QueryBuilders.boolQuery().must(searchSourceBuilder.query())
if (index == SCHEDULED_JOBS_INDEX) {
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
}

searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.ExistsQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

Expand All @@ -43,6 +48,14 @@ class TransportSearchMonitorAction @Inject constructor(
}

override fun doExecute(task: Task, searchMonitorRequest: SearchMonitorRequest, actionListener: ActionListener<SearchResponse>) {
val searchSourceBuilder = searchMonitorRequest.searchRequest.source()
val queryBuilder = if (searchSourceBuilder.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchSourceBuilder.query())
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
addOwnerFieldIfNotExists(searchMonitorRequest.searchRequest)
val user = readUserFromThreadContext(client)
client.threadPool().threadContext.stashContext().use {
resolve(searchMonitorRequest, actionListener, user)
Expand Down Expand Up @@ -78,4 +91,16 @@ class TransportSearchMonitorAction @Inject constructor(
}
)
}

private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) {
if (searchRequest.source().query() == null || searchRequest.source().query().toString().contains("monitor.owner") == false) {
var boolQueryBuilder: BoolQueryBuilder = if (searchRequest.source().query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchRequest.source().query())
val bqb = BoolQueryBuilder()
bqb.should().add(BoolQueryBuilder().mustNot(ExistsQueryBuilder("monitor.owner")))
bqb.should().add(BoolQueryBuilder().must(MatchQueryBuilder("monitor.owner", "alerting")))
boolQueryBuilder.filter(bqb)
searchRequest.source().query(boolQueryBuilder)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -669,4 +669,45 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
return this[key] as Map<String, Map<String, Any>>
}

fun `test execute monitor with non-null owner`() {

val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val alertCategories = AlertCategory.values()
val actionExecutionScope = PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()
)
val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope)
val actions = (0..randomInt(10)).map {
randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = actionExecutionPolicy
)
}

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
try {
createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
owner = "owner"
)
)
fail("Expected create monitor to fail")
} catch (e: ResponseException) {
assertTrue(e.message!!.contains("illegal_argument_exception"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
Expand All @@ -21,6 +24,7 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -319,6 +323,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorId,
owner = "security_analytics_plugin",
dataSources = DataSources(
alertsIndex = customAlertsIndex,
queryIndex = customQueryIndex,
Expand All @@ -328,6 +333,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorId
)
Assert.assertNotNull(updateMonitorResponse)
Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin")
indexDoc(index, "2", testDoc)
executeMonitorResponse = executeMonitor(updateMonitorResponse!!.monitor, monitorId, false)
Assert.assertTrue(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
Expand All @@ -347,6 +353,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX)
var searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 0)
searchRequest.source().query(MatchQueryBuilder("monitor.owner", "security_analytics_plugin"))
searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1)
}

fun `test execute GetFindingsAction with monitorId param`() {
Expand Down Expand Up @@ -427,6 +443,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}

fun `test execute monitor with owner field`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(alertsIndex = customAlertsIndex),
owner = "owner"
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "owner")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
val alerts = searchAlerts(id, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, alerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute GetFindingsAction with unknown findingIndex param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down
10 changes: 6 additions & 4 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,13 @@ fun randomDocumentLevelMonitor(
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
withMetadata: Boolean = false,
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), owner = owner
)
}

Expand All @@ -181,12 +182,13 @@ fun randomDocumentLevelMonitor(
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
dataSources: DataSources
dataSources: DataSources,
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
)
}

Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/mappings/scheduled-jobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
}
}
},
"owner": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"monitor_type": {
"type": "keyword"
},
Expand Down

0 comments on commit ca22ee3

Please sign in to comment.