Skip to content

Commit

Permalink
Adds owner field in monitor model (#579)
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep authored Oct 3, 2022
1 parent 3a844f8 commit cda8301
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 14 deletions.
15 changes: 12 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ data class Monitor(
val inputs: List<Input>,
val triggers: List<Trigger>,
val uiMetadata: Map<String, Any>,
val dataSources: DataSources = DataSources()
val dataSources: DataSources = DataSources(),
val owner: String? = "alerting"
) : ScheduledJob {

override val type = MONITOR_TYPE
Expand Down Expand Up @@ -118,7 +119,8 @@ data class Monitor(
DataSources(sin)
} else {
DataSources()
}
},
owner = sin.readOptionalString()
)

// This enum classifies different Monitors
Expand Down Expand Up @@ -167,6 +169,7 @@ data class Monitor(
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata)
builder.field(DATA_SOURCES_FIELD, dataSources)
builder.field(OWNER_FIELD, owner)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -211,6 +214,7 @@ data class Monitor(
out.writeMap(uiMetadata)
out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field
dataSources.writeTo(out)
out.writeOptionalString(owner)
}

companion object {
Expand All @@ -229,6 +233,7 @@ data class Monitor(
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val UI_METADATA_FIELD = "ui_metadata"
const val DATA_SOURCES_FIELD = "data_sources"
const val OWNER_FIELD = "owner"
const val ENABLED_TIME_FIELD = "enabled_time"

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
Expand Down Expand Up @@ -256,6 +261,7 @@ data class Monitor(
val triggers: MutableList<Trigger> = mutableListOf()
val inputs: MutableList<Input> = mutableListOf()
var dataSources = DataSources()
var owner = "alerting"

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down Expand Up @@ -295,6 +301,8 @@ data class Monitor(
UI_METADATA_FIELD -> uiMetadata = xcp.map()
DATA_SOURCES_FIELD -> dataSources = if (xcp.currentToken() == Token.VALUE_NULL) DataSources()
else DataSources.parse(xcp)
OWNER_FIELD -> owner = if (xcp.currentToken() == Token.VALUE_NULL) owner
else xcp.text()
else -> {
xcp.skipChildren()
}
Expand All @@ -320,7 +328,8 @@ data class Monitor(
inputs.toList(),
triggers.toList(),
uiMetadata,
dataSources
dataSources,
owner
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
validateDataSources(monitor)
validateOwner(monitor.owner)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Expand Down Expand Up @@ -124,6 +125,12 @@ class RestIndexMonitorAction : BaseRestHandler() {
}
}

private fun validateOwner(owner: String?) {
if (owner != "alerting") {
throw IllegalArgumentException("Invalid owner field")
}
}

private fun validateDataSources(monitor: Monitor) { // Data Sources will currently be supported only at transport layer.
if (monitor.dataSources != null) {
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.context
import org.opensearch.client.node.NodeClient
Expand All @@ -25,7 +24,6 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
Expand Down Expand Up @@ -97,14 +95,6 @@ class RestSearchMonitorAction(
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser())
searchSourceBuilder.fetchSource(context(request))

val queryBuilder = QueryBuilders.boolQuery().must(searchSourceBuilder.query())
if (index == SCHEDULED_JOBS_INDEX) {
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
}

searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
Expand All @@ -21,6 +22,10 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.ExistsQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

Expand All @@ -43,6 +48,14 @@ class TransportSearchMonitorAction @Inject constructor(
}

override fun doExecute(task: Task, searchMonitorRequest: SearchMonitorRequest, actionListener: ActionListener<SearchResponse>) {
val searchSourceBuilder = searchMonitorRequest.searchRequest.source()
val queryBuilder = if (searchSourceBuilder.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchSourceBuilder.query())
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
addOwnerFieldIfNotExists(searchMonitorRequest.searchRequest)
val user = readUserFromThreadContext(client)
client.threadPool().threadContext.stashContext().use {
resolve(searchMonitorRequest, actionListener, user)
Expand Down Expand Up @@ -78,4 +91,16 @@ class TransportSearchMonitorAction @Inject constructor(
}
)
}

private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) {
if (searchRequest.source().query() == null || searchRequest.source().query().toString().contains("monitor.owner") == false) {
var boolQueryBuilder: BoolQueryBuilder = if (searchRequest.source().query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchRequest.source().query())
val bqb = BoolQueryBuilder()
bqb.should().add(BoolQueryBuilder().mustNot(ExistsQueryBuilder("monitor.owner")))
bqb.should().add(BoolQueryBuilder().must(MatchQueryBuilder("monitor.owner", "alerting")))
boolQueryBuilder.filter(bqb)
searchRequest.source().query(boolQueryBuilder)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,47 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
}
}

fun `test execute monitor with non-null owner`() {

val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val alertCategories = AlertCategory.values()
val actionExecutionScope = PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()
)
val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope)
val actions = (0..randomInt(10)).map {
randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = actionExecutionPolicy
)
}

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
try {
createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
owner = "owner"
)
)
fail("Expected create monitor to fail")
} catch (e: ResponseException) {
assertTrue(e.message!!.contains("illegal_argument_exception"))
}
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetAlertsRequest
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.DocLevelQuery
Expand All @@ -18,6 +21,7 @@ import org.opensearch.alerting.model.DataSources
import org.opensearch.alerting.model.Table
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
import org.opensearch.index.query.MatchQueryBuilder
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
Expand All @@ -41,6 +45,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "alerting")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, true)
Expand Down Expand Up @@ -107,6 +112,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with owner field`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(alertsIndex = customAlertsIndex),
owner = "owner"
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "owner")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
val alerts = searchAlerts(id, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, alerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with custom query index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down Expand Up @@ -451,7 +497,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertNotNull(getMonitorResponse)
Assert.assertNotNull(getMonitorResponse.monitor)
val monitor = getMonitorResponse.monitor

val sr = SearchRequest(SCHEDULED_JOBS_INDEX)
val g =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(sr))
.get()
Assert.assertNotNull(g)
Assert.assertEquals(g.hits.hits.size, 1)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
Expand All @@ -473,6 +524,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorId,
owner = "security_analytics_plugin",
dataSources = DataSources(
alertsIndex = customAlertsIndex,
queryIndex = customQueryIndex,
Expand All @@ -482,6 +534,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorId
)
Assert.assertNotNull(updateMonitorResponse)
Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin")
indexDoc(index, "2", testDoc)
if (updateMonitorResponse != null) {
executeMonitorResponse = executeMonitor(updateMonitorResponse.monitor, monitorId, false)
Expand All @@ -502,5 +555,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)

val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX)
var searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 0)
searchRequest.source().query(MatchQueryBuilder("monitor.owner", "security_analytics_plugin"))
searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1)
}
}
20 changes: 20 additions & 0 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ fun randomDocumentLevelMonitor(
)
}

fun randomDocumentLevelMonitor(
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
inputs: List<Input> = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())),
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = randomBoolean(),
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
dataSources: DataSources = DataSources(),
owner: String
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
)
}

fun randomDocumentLevelMonitor(
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/mappings/scheduled-jobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
}
}
},
"owner": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"monitor_type": {
"type": "keyword"
},
Expand Down

0 comments on commit cda8301

Please sign in to comment.