Skip to content

Commit

Permalink
Update MonitorRunner for Bucket-Level Alerting (#155)
Browse files Browse the repository at this point in the history
* Update MonitorRunner for Bucket-Level Alerting

Signed-off-by: Mohammad Qureshi <[email protected]>

* Update regressed comment in MonitorRunnerIT

Signed-off-by: Mohammad Qureshi <[email protected]>

* Add TODO to break down runBucketLevelMonitor method in MonitorRunner

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Aug 27, 2021
1 parent 95306f0 commit 28f401d
Show file tree
Hide file tree
Showing 8 changed files with 741 additions and 156 deletions.
5 changes: 5 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ configurations.all {
}
}

configurations.testCompile {
exclude module: "securemock"
}

dependencies {
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}"

Expand All @@ -82,6 +86,7 @@ dependencies {
implementation "com.github.seancfoley:ipaddress:5.3.3"

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testCompile "org.mockito:mockito-core:2.23.0"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
364 changes: 298 additions & 66 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.ExecuteMonitorResponse
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.isBucketLevelMonitor
import org.opensearch.client.Client
import org.opensearch.common.inject.Inject
import org.opensearch.common.xcontent.LoggingDeprecationHandler
Expand Down Expand Up @@ -68,7 +69,11 @@ class TransportExecuteMonitorAction @Inject constructor(
val (periodStart, periodEnd) =
monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis))
try {
val monitorRunResult = runner.runMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
val monitorRunResult = if (monitor.isBucketLevelMonitor()) {
runner.runBucketLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
} else {
runner.runQueryLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
}
withContext(Dispatchers.IO) {
actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
Expand Down Expand Up @@ -154,3 +155,19 @@ fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinTo

fun Action.getActionScope(): ActionExecutionScope.Type =
this.actionExecutionPolicy.actionExecutionScope.getExecutionScope()

fun BucketLevelTriggerRunResult.getCombinedTriggerRunResult(
prevTriggerRunResult: BucketLevelTriggerRunResult?
): BucketLevelTriggerRunResult {
if (prevTriggerRunResult == null) return this

// The aggregation results and action results across to two trigger run results should not have overlapping keys
// since they represent different pages of aggregations so a simple concatenation will combine them
val mergedAggregationResultBuckets = prevTriggerRunResult.aggregationResultBuckets + this.aggregationResultBuckets
val mergedActionResultsMap = (prevTriggerRunResult.actionResultsMap + this.actionResultsMap).toMutableMap()

// Update to the most recent error if it's not null, otherwise keep the old one
val error = this.error ?: prevTriggerRunResult.error

return this.copy(aggregationResultBuckets = mergedAggregationResultBuckets, actionResultsMap = mergedActionResultsMap, error = error)
}
344 changes: 312 additions & 32 deletions alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt

Large diffs are not rendered by default.

107 changes: 75 additions & 32 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
package org.opensearch.alerting

import org.apache.http.Header
import org.apache.http.HttpEntity
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter
import org.opensearch.alerting.core.model.Input
Expand All @@ -35,18 +37,18 @@ import org.opensearch.alerting.elasticapi.string
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.ActionExecutionPolicy
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
Expand All @@ -55,9 +57,6 @@ import org.opensearch.alerting.model.destination.email.EmailAccount
import org.opensearch.alerting.model.destination.email.EmailEntry
import org.opensearch.alerting.model.destination.email.EmailGroup
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.commons.authuser.User
import org.apache.http.Header
import org.apache.http.HttpEntity
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.Response
Expand All @@ -72,6 +71,7 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.QueryBuilders
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
Expand All @@ -97,9 +97,11 @@ fun randomQueryLevelMonitor(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
): Monitor {
return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
}

// Monitor of older versions without security.
Expand All @@ -113,9 +115,11 @@ fun randomQueryLevelMonitorWithoutUser(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
): Monitor {
return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
}

fun randomBucketLevelMonitor(
Expand All @@ -134,9 +138,11 @@ fun randomBucketLevelMonitor(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
): Monitor {
return Monitor(name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
}

fun randomQueryLevelTrigger(
Expand All @@ -152,7 +158,8 @@ fun randomQueryLevelTrigger(
name = name,
severity = severity,
condition = condition,
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions)
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions
)
}

fun randomBucketLevelTrigger(
Expand All @@ -168,7 +175,8 @@ fun randomBucketLevelTrigger(
name = name,
severity = severity,
bucketSelector = bucketSelector,
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions)
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions
)
}

fun randomBucketSelectorExtAggregationBuilder(
Expand Down Expand Up @@ -247,13 +255,19 @@ fun randomThrottle(
fun randomActionExecutionPolicy(
throttle: Throttle = randomThrottle(),
actionExecutionScope: ActionExecutionScope = randomActionExecutionFrequency()
) = ActionExecutionPolicy(throttle, actionExecutionScope)
): ActionExecutionPolicy {
return if (actionExecutionScope is PerExecutionActionScope) {
// Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it
ActionExecutionPolicy(null, actionExecutionScope)
} else {
ActionExecutionPolicy(throttle, actionExecutionScope)
}
}

fun randomActionExecutionFrequency(): ActionExecutionScope {
return if (randomBoolean()) {
val alertCategories = AlertCategory.values()
PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet())
PerAlertActionScope(actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet())
} else {
PerExecutionActionScope()
}
Expand All @@ -262,16 +276,24 @@ fun randomActionExecutionFrequency(): ActionExecutionScope {
fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert {
val trigger = randomQueryLevelTrigger()
val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult())
return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults)
return Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults
)
}

fun randomAlertWithAggregationResultBucket(monitor: Monitor = randomBucketLevelMonitor()): Alert {
val trigger = randomBucketLevelTrigger()
val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult())
return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults, aggregationResultBucket = AggregationResultBucket("parent_bucket_path_1",
listOf("bucket_key_1"), mapOf("k1" to "val1", "k2" to "val2")))
return Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults,
aggregationResultBucket = AggregationResultBucket(
"parent_bucket_path_1",
listOf("bucket_key_1"),
mapOf("k1" to "val1", "k2" to "val2")
)
)
}

fun randomEmailAccountMethod(): EmailAccount.MethodType {
Expand Down Expand Up @@ -332,17 +354,29 @@ fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult {
map.plus(Pair("key1", randomActionRunResult()))
map.plus(Pair("key2", randomActionRunResult()))

val aggBucket1 = AggregationResultBucket("parent_bucket_path_1", listOf("bucket_key_1"),
mapOf("k1" to "val1", "k2" to "val2"))
val aggBucket2 = AggregationResultBucket("parent_bucket_path_2", listOf("bucket_key_2"),
mapOf("k1" to "val1", "k2" to "val2"))
val aggBucket1 = AggregationResultBucket(
"parent_bucket_path_1",
listOf("bucket_key_1"),
mapOf("k1" to "val1", "k2" to "val2")
)
val aggBucket2 = AggregationResultBucket(
"parent_bucket_path_2",
listOf("bucket_key_2"),
mapOf("k1" to "val1", "k2" to "val2")
)

val actionResultsMap: MutableMap<String, MutableMap<String, ActionRunResult>> = mutableMapOf()
actionResultsMap[aggBucket1.getBucketKeysHash()] = map
actionResultsMap[aggBucket2.getBucketKeysHash()] = map

return BucketLevelTriggerRunResult("trigger-name", null,
mapOf(aggBucket1.getBucketKeysHash() to aggBucket1, aggBucket2.getBucketKeysHash() to aggBucket2), actionResultsMap)
return BucketLevelTriggerRunResult(
"trigger-name", null,
mapOf(
aggBucket1.getBucketKeysHash() to aggBucket1,
aggBucket2.getBucketKeysHash() to aggBucket2
),
actionResultsMap
)
}

fun randomActionRunResult(): ActionRunResult {
Expand All @@ -361,8 +395,15 @@ fun Monitor.toJsonString(): String {
}

fun randomUser(): User {
return User(OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10),
OpenSearchRestTestCase.randomAlphaOfLength(10)), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), listOf("test_attr=test"))
return User(
OpenSearchRestTestCase.randomAlphaOfLength(10),
listOf(
OpenSearchRestTestCase.randomAlphaOfLength(10),
OpenSearchRestTestCase.randomAlphaOfLength(10)
),
listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"),
listOf("test_attr=test")
)
}

fun randomUserEmpty(): User {
Expand Down Expand Up @@ -440,7 +481,9 @@ fun parser(xc: String): XContentParser {
}

fun xContentRegistry(): NamedXContentRegistry {
return NamedXContentRegistry(listOf(
SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY) +
SearchModule(Settings.EMPTY, false, emptyList()).namedXContents)
return NamedXContentRegistry(
listOf(
SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY
) + SearchModule(Settings.EMPTY, false, emptyList()).namedXContents
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class BucketSelectorExtAggregationBuilderTests : BasePipelineAggregationTestCase
params["foo"] = "bar"
}
val type = randomFrom(*ScriptType.values())
script =
Script(
type, if (type == ScriptType.STORED) null else
randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG), "script", params
)
script = Script(
type,
if (type == ScriptType.STORED) null else randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG),
"script", params
)
}
val parentBucketPath = randomAlphaOfLengthBetween(3, 20)
val filter = BucketSelectorExtFilter(IncludeExclude("foo.*", "bar.*"))
Expand Down
Loading

0 comments on commit 28f401d

Please sign in to comment.