Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds owner field in monitor model #579

Merged
merged 7 commits into from
Oct 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ data class Monitor(
val inputs: List<Input>,
val triggers: List<Trigger>,
val uiMetadata: Map<String, Any>,
val dataSources: DataSources = DataSources()
val dataSources: DataSources = DataSources(),
val owner: String? = "alerting"
) : ScheduledJob {

override val type = MONITOR_TYPE
Expand Down Expand Up @@ -118,7 +119,8 @@ data class Monitor(
DataSources(sin)
} else {
DataSources()
}
},
owner = sin.readOptionalString()
)

// This enum classifies different Monitors
Expand Down Expand Up @@ -167,6 +169,7 @@ data class Monitor(
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
if (uiMetadata.isNotEmpty()) builder.field(UI_METADATA_FIELD, uiMetadata)
builder.field(DATA_SOURCES_FIELD, dataSources)
builder.field(OWNER_FIELD, owner)
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -211,6 +214,7 @@ data class Monitor(
out.writeMap(uiMetadata)
out.writeBoolean(dataSources != null) // for backward compatibility with pre-existing monitors which don't have datasources field
dataSources.writeTo(out)
out.writeOptionalString(owner)
}

companion object {
Expand All @@ -229,6 +233,7 @@ data class Monitor(
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val UI_METADATA_FIELD = "ui_metadata"
const val DATA_SOURCES_FIELD = "data_sources"
const val OWNER_FIELD = "owner"
const val ENABLED_TIME_FIELD = "enabled_time"

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
Expand Down Expand Up @@ -256,6 +261,7 @@ data class Monitor(
val triggers: MutableList<Trigger> = mutableListOf()
val inputs: MutableList<Input> = mutableListOf()
var dataSources = DataSources()
var owner = "alerting"

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down Expand Up @@ -295,6 +301,8 @@ data class Monitor(
UI_METADATA_FIELD -> uiMetadata = xcp.map()
DATA_SOURCES_FIELD -> dataSources = if (xcp.currentToken() == Token.VALUE_NULL) DataSources()
else DataSources.parse(xcp)
OWNER_FIELD -> owner = if (xcp.currentToken() == Token.VALUE_NULL) owner
else xcp.text()
else -> {
xcp.skipChildren()
}
Expand All @@ -320,7 +328,8 @@ data class Monitor(
inputs.toList(),
triggers.toList(),
uiMetadata,
dataSources
dataSources,
owner
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class RestIndexMonitorAction : BaseRestHandler() {
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
validateDataSources(monitor)
validateOwner(monitor.owner)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Expand Down Expand Up @@ -124,6 +125,12 @@ class RestIndexMonitorAction : BaseRestHandler() {
}
}

private fun validateOwner(owner: String?) {
if (owner != "alerting") {
throw IllegalArgumentException("Invalid owner field")
}
}

private fun validateDataSources(monitor: Monitor) { // Data Sources will currently be supported only at transport layer.
if (monitor.dataSources != null) {
if (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.context
import org.opensearch.client.node.NodeClient
Expand All @@ -25,7 +24,6 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent.EMPTY_PARAMS
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
Expand Down Expand Up @@ -97,14 +95,6 @@ class RestSearchMonitorAction(
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser())
searchSourceBuilder.fetchSource(context(request))

val queryBuilder = QueryBuilders.boolQuery().must(searchSourceBuilder.query())
if (index == SCHEDULED_JOBS_INDEX) {
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
}

searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
val searchRequest = SearchRequest()
.source(searchSourceBuilder)
.indices(index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
Expand All @@ -21,6 +22,10 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.ExistsQueryBuilder
import org.opensearch.index.query.MatchQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

Expand All @@ -43,6 +48,14 @@ class TransportSearchMonitorAction @Inject constructor(
}

override fun doExecute(task: Task, searchMonitorRequest: SearchMonitorRequest, actionListener: ActionListener<SearchResponse>) {
val searchSourceBuilder = searchMonitorRequest.searchRequest.source()
val queryBuilder = if (searchSourceBuilder.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchSourceBuilder.query())
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
searchSourceBuilder.query(queryBuilder)
.seqNoAndPrimaryTerm(true)
.version(true)
addOwnerFieldIfNotExists(searchMonitorRequest.searchRequest)
val user = readUserFromThreadContext(client)
client.threadPool().threadContext.stashContext().use {
resolve(searchMonitorRequest, actionListener, user)
Expand Down Expand Up @@ -78,4 +91,16 @@ class TransportSearchMonitorAction @Inject constructor(
}
)
}

private fun addOwnerFieldIfNotExists(searchRequest: SearchRequest) {
if (searchRequest.source().query() == null || searchRequest.source().query().toString().contains("monitor.owner") == false) {
var boolQueryBuilder: BoolQueryBuilder = if (searchRequest.source().query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(searchRequest.source().query())
val bqb = BoolQueryBuilder()
bqb.should().add(BoolQueryBuilder().mustNot(ExistsQueryBuilder("monitor.owner")))
bqb.should().add(BoolQueryBuilder().must(MatchQueryBuilder("monitor.owner", "alerting")))
boolQueryBuilder.filter(bqb)
searchRequest.source().query(boolQueryBuilder)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,47 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
}
}

fun `test execute monitor with non-null owner`() {

val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val alertCategories = AlertCategory.values()
val actionExecutionScope = PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet()
)
val actionExecutionPolicy = ActionExecutionPolicy(actionExecutionScope)
val actions = (0..randomInt(10)).map {
randomActionWithPolicy(
template = randomTemplateScript("Hello {{ctx.monitor.name}}"),
destinationId = createDestination().id,
actionExecutionPolicy = actionExecutionPolicy
)
}

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = actions)
try {
createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
owner = "owner"
)
)
fail("Expected create monitor to fail")
} catch (e: ResponseException) {
assertTrue(e.message!!.contains("illegal_argument_exception"))
}
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ package org.opensearch.alerting
import org.junit.Assert
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetAlertsRequest
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.action.SearchMonitorRequest
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.DocLevelQuery
Expand All @@ -18,6 +21,7 @@ import org.opensearch.alerting.model.DataSources
import org.opensearch.alerting.model.Table
import org.opensearch.alerting.transport.AlertingSingleNodeTestCase
import org.opensearch.common.settings.Settings
import org.opensearch.index.query.MatchQueryBuilder
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
Expand All @@ -41,6 +45,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "alerting")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, true)
Expand Down Expand Up @@ -107,6 +112,47 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with owner field`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
dataSources = DataSources(alertsIndex = customAlertsIndex),
owner = "owner"
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
Assert.assertEquals(monitor.owner, "owner")
indexDoc(index, "1", testDoc)
val id = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, id, false)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1)
val alerts = searchAlerts(id, customAlertsIndex)
assertEquals("Alert saved for test monitor", 1, alerts.size)
val table = Table("asc", "id", null, 1, 0, "")
var getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
getAlertsResponse = client()
.execute(GetAlertsAction.INSTANCE, GetAlertsRequest(table, "ALL", "ALL", id, null))
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)
}

fun `test execute monitor with custom query index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down Expand Up @@ -451,7 +497,12 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Assert.assertNotNull(getMonitorResponse)
Assert.assertNotNull(getMonitorResponse.monitor)
val monitor = getMonitorResponse.monitor

val sr = SearchRequest(SCHEDULED_JOBS_INDEX)
val g =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(sr))
.get()
Assert.assertNotNull(g)
Assert.assertEquals(g.hits.hits.size, 1)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
Expand All @@ -473,6 +524,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
val updateMonitorResponse = updateMonitor(
monitor.copy(
id = monitorId,
owner = "security_analytics_plugin",
dataSources = DataSources(
alertsIndex = customAlertsIndex,
queryIndex = customQueryIndex,
Expand All @@ -482,6 +534,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
monitorId
)
Assert.assertNotNull(updateMonitorResponse)
Assert.assertEquals(updateMonitorResponse!!.monitor.owner, "security_analytics_plugin")
indexDoc(index, "2", testDoc)
if (updateMonitorResponse != null) {
executeMonitorResponse = executeMonitor(updateMonitorResponse.monitor, monitorId, false)
Expand All @@ -502,5 +555,16 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
.get()
Assert.assertTrue(getAlertsResponse != null)
Assert.assertTrue(getAlertsResponse.alerts.size == 1)

val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX)
var searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 0)
searchRequest.source().query(MatchQueryBuilder("monitor.owner", "security_analytics_plugin"))
searchMonitorResponse =
client().execute(SearchMonitorAction.INSTANCE, SearchMonitorRequest(searchRequest))
.get()
Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1)
}
}
20 changes: 20 additions & 0 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,26 @@ fun randomDocumentLevelMonitor(
)
}

fun randomDocumentLevelMonitor(
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
inputs: List<Input> = listOf(DocLevelMonitorInput("description", listOf("index"), emptyList())),
schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES),
enabled: Boolean = randomBoolean(),
triggers: List<Trigger> = (1..randomInt(10)).map { randomQueryLevelTrigger() },
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false,
dataSources: DataSources = DataSources(),
owner: String
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
)
}

fun randomDocumentLevelMonitor(
name: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
user: User? = randomUser(),
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/resources/mappings/scheduled-jobs.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@
}
}
},
"owner": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"monitor_type": {
"type": "keyword"
},
Expand Down