Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds filtering on owner field in search monitor action #641

Merged
merged 1 commit into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
validateDataSources(monitor)
validateOwner(monitor.owner)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Expand Down Expand Up @@ -136,6 +137,12 @@ class RestIndexMonitorAction : BaseRestHandler() {
}
}

private fun validateOwner(owner: String?) {
if (owner != "alerting") {
throw IllegalArgumentException("Invalid owner field")
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method):
RestResponseListener<IndexMonitorResponse> {
return object : RestResponseListener<IndexMonitorResponse>(channel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
Expand Down Expand Up @@ -97,14 +95,6 @@ class RestSearchMonitorAction(
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser())
searchSourceBuilder.fetchSource(context(request))

val queryBuilder = QueryBuilders.boolQuery().must(searchSourceBuilder.query())
if (index == SCHEDULED_JOBS_INDEX) {
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
}

searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.ExistsQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

Expand All @@ -43,6 +48,14 @@ class TransportSearchMonitorAction @Inject constructor(
}

override fun doExecute(task: Task, searchMonitorRequest: SearchMonitorRequest, actionListener: ActionListener<SearchResponse>) {
val searchSourceBuilder = searchMonitorRequest.searchRequest.source()
val queryBuilder = if (searchSourceBuilder.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchSourceBuilder.query())
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
addOwnerFieldIfNotExists(searchMonitorRequest.searchRequest)
val user = readUserFromThreadContext(client)
client.threadPool().threadContext.stashContext().use {
resolve(searchMonitorRequest, actionListener, user)
Expand Down Expand Up @@ -78,4 +91,16 @@ class TransportSearchMonitorAction @Inject constructor(
}
)
}

private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) {
if (searchRequest.source().query() == null || searchRequest.source().query().toString().contains("monitor.owner") == false) {
var boolQueryBuilder: BoolQueryBuilder = if (searchRequest.source().query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchRequest.source().query())
val bqb = BoolQueryBuilder()
bqb.should().add(BoolQueryBuilder().mustNot(ExistsQueryBuilder("monitor.owner")))
bqb.should().add(BoolQueryBuilder().must(MatchQueryBuilder("monitor.owner", "alerting")))
boolQueryBuilder.filter(bqb)
searchRequest.source().query(boolQueryBuilder)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -669,4 +669,45 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
return this[key] as Map<String, Map<String, Any>>
}

fun `test execute monitor with non-null owner`() {

val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val alertCategories = AlertCategory.values()
val actionExecutionScope = PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()
)
val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope)
val actions = (0..randomInt(10)).map {
randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = actionExecutionPolicy
)
}

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
try {
createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
owner = "owner"
)
)
fail("Expected create monitor to fail")
} catch (e: ResponseException) {
assertTrue(e.message!!.contains("illegal_argument_exception"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
Expand All @@ -21,6 +24,7 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
Expand Down Expand Up @@ -319,6 +323,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorId,
owner = "security_analytics_plugin",
dataSources = DataSources(
alertsIndex = customAlertsIndex,
queryIndex = customQueryIndex,
Expand All @@ -328,6 +333,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorId
)
Assert.assertNotNull(updateMonitorResponse)
Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin")
indexDoc(index, "2", testDoc)
executeMonitorResponse = executeMonitor(updateMonitorResponse!!.monitor, monitorId, false)
Assert.assertTrue(client().admin().cluster().state(ClusterStateRequest()).get().state.routingTable.hasIndex(customQueryIndex))
Expand All @@ -347,6 +353,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX)
var searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 0)
searchRequest.source().query(MatchQueryBuilder("monitor.owner", "security_analytics_plugin"))
searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1)
}

fun `test execute GetFindingsAction with monitorId param`() {
Expand Down Expand Up @@ -427,6 +443,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}

fun `test execute monitor with owner field`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(alertsIndex = customAlertsIndex),
owner = "owner"
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "owner")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
val alerts = searchAlerts(id, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, alerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute GetFindingsAction with unknown findingIndex param`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down
10 changes: 6 additions & 4 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,13 @@ fun randomDocumentLevelMonitor(
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
withMetadata: Boolean = false,
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), owner = owner
)
}

Expand All @@ -181,12 +182,13 @@ fun randomDocumentLevelMonitor(
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
dataSources: DataSources
dataSources: DataSources,
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
)
}

Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/mappings/scheduled-jobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
}
}
},
"owner": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"monitor_type": {
"type": "keyword"
},
Expand Down