Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MonitorRunner for Bucket-Level Alerting #155

Merged
merged 3 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ configurations.all {
}
}

configurations.testCompile {
exclude module: "securemock"
}

dependencies {
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}"

Expand All @@ -82,6 +86,7 @@ dependencies {
implementation "com.github.seancfoley:ipaddress:5.3.3"

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testCompile "org.mockito:mockito-core:2.23.0"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
362 changes: 296 additions & 66 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.ExecuteMonitorResponse
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.isBucketLevelMonitor
import org.opensearch.client.Client
import org.opensearch.common.inject.Inject
import org.opensearch.common.xcontent.LoggingDeprecationHandler
Expand Down Expand Up @@ -68,7 +69,11 @@ class TransportExecuteMonitorAction @Inject constructor(
val (periodStart, periodEnd) =
monitor.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execMonitorRequest.requestEnd.millis))
try {
val monitorRunResult = runner.runMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
val monitorRunResult = if (monitor.isBucketLevelMonitor()) {
runner.runBucketLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
} else {
runner.runQueryLevelMonitor(monitor, periodStart, periodEnd, execMonitorRequest.dryrun)
}
withContext(Dispatchers.IO) {
actionListener.onResponse(ExecuteMonitorResponse(monitorRunResult))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
Expand Down Expand Up @@ -154,3 +155,19 @@ fun AggregationResultBucket.getBucketKeysHash(): String = this.bucketKeys.joinTo

fun Action.getActionScope(): ActionExecutionScope.Type =
this.actionExecutionPolicy.actionExecutionScope.getExecutionScope()

fun BucketLevelTriggerRunResult.getCombinedTriggerRunResult(
prevTriggerRunResult: BucketLevelTriggerRunResult?
): BucketLevelTriggerRunResult {
if (prevTriggerRunResult == null) return this

// The aggregation results and action results across to two trigger run results should not have overlapping keys
// since they represent different pages of aggregations so a simple concatenation will combine them
val mergedAggregationResultBuckets = prevTriggerRunResult.aggregationResultBuckets + this.aggregationResultBuckets
val mergedActionResultsMap = (prevTriggerRunResult.actionResultsMap + this.actionResultsMap).toMutableMap()

// Update to the most recent error if it's not null, otherwise keep the old one
val error = this.error ?: prevTriggerRunResult.error

return this.copy(aggregationResultBuckets = mergedAggregationResultBuckets, actionResultsMap = mergedActionResultsMap, error = error)
}
346 changes: 313 additions & 33 deletions alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt

Large diffs are not rendered by default.

107 changes: 75 additions & 32 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
package org.opensearch.alerting

import org.apache.http.Header
import org.apache.http.HttpEntity
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtFilter
import org.opensearch.alerting.core.model.Input
Expand All @@ -35,18 +37,18 @@ import org.opensearch.alerting.elasticapi.string
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.action.Action
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.ActionExecutionPolicy
import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
Expand All @@ -55,9 +57,6 @@ import org.opensearch.alerting.model.destination.email.EmailAccount
import org.opensearch.alerting.model.destination.email.EmailEntry
import org.opensearch.alerting.model.destination.email.EmailGroup
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.commons.authuser.User
import org.apache.http.Header
import org.apache.http.HttpEntity
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.Response
Expand All @@ -72,6 +71,7 @@ import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.QueryBuilders
import org.opensearch.script.Script
import org.opensearch.script.ScriptType
Expand All @@ -97,9 +97,11 @@ fun randomQueryLevelMonitor(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
): Monitor {
return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
}

// Monitor of older versions without security.
Expand All @@ -113,9 +115,11 @@ fun randomQueryLevelMonitorWithoutUser(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
): Monitor {
return Monitor(name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
}

fun randomBucketLevelMonitor(
Expand All @@ -134,9 +138,11 @@ fun randomBucketLevelMonitor(
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
withMetadata: Boolean = false
): Monitor {
return Monitor(name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf())
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
}

fun randomQueryLevelTrigger(
Expand All @@ -152,7 +158,8 @@ fun randomQueryLevelTrigger(
name = name,
severity = severity,
condition = condition,
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions)
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions
)
}

fun randomBucketLevelTrigger(
Expand All @@ -168,7 +175,8 @@ fun randomBucketLevelTrigger(
name = name,
severity = severity,
bucketSelector = bucketSelector,
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions)
actions = if (actions.isEmpty()) (0..randomInt(10)).map { randomAction(destinationId = destinationId) } else actions
)
}

fun randomBucketSelectorExtAggregationBuilder(
Expand Down Expand Up @@ -247,13 +255,19 @@ fun randomThrottle(
fun randomActionExecutionPolicy(
throttle: Throttle = randomThrottle(),
actionExecutionScope: ActionExecutionScope = randomActionExecutionFrequency()
) = ActionExecutionPolicy(throttle, actionExecutionScope)
): ActionExecutionPolicy {
return if (actionExecutionScope is PerExecutionActionScope) {
// Return null for throttle when using PerExecutionActionScope since throttling is currently not supported for it
ActionExecutionPolicy(null, actionExecutionScope)
} else {
ActionExecutionPolicy(throttle, actionExecutionScope)
}
}

fun randomActionExecutionFrequency(): ActionExecutionScope {
return if (randomBoolean()) {
val alertCategories = AlertCategory.values()
PerAlertActionScope(
actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet())
PerAlertActionScope(actionableAlerts = (1..randomInt(alertCategories.size)).map { alertCategories[it - 1] }.toSet())
} else {
PerExecutionActionScope()
}
Expand All @@ -262,16 +276,24 @@ fun randomActionExecutionFrequency(): ActionExecutionScope {
fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert {
val trigger = randomQueryLevelTrigger()
val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult())
return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults)
return Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults
)
}

fun randomAlertWithAggregationResultBucket(monitor: Monitor = randomBucketLevelMonitor()): Alert {
val trigger = randomBucketLevelTrigger()
val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult())
return Alert(monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults, aggregationResultBucket = AggregationResultBucket("parent_bucket_path_1",
listOf("bucket_key_1"), mapOf("k1" to "val1", "k2" to "val2")))
return Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults,
aggregationResultBucket = AggregationResultBucket(
"parent_bucket_path_1",
listOf("bucket_key_1"),
mapOf("k1" to "val1", "k2" to "val2")
)
)
}

fun randomEmailAccountMethod(): EmailAccount.MethodType {
Expand Down Expand Up @@ -332,17 +354,29 @@ fun randomBucketLevelTriggerRunResult(): BucketLevelTriggerRunResult {
map.plus(Pair("key1", randomActionRunResult()))
map.plus(Pair("key2", randomActionRunResult()))

val aggBucket1 = AggregationResultBucket("parent_bucket_path_1", listOf("bucket_key_1"),
mapOf("k1" to "val1", "k2" to "val2"))
val aggBucket2 = AggregationResultBucket("parent_bucket_path_2", listOf("bucket_key_2"),
mapOf("k1" to "val1", "k2" to "val2"))
val aggBucket1 = AggregationResultBucket(
"parent_bucket_path_1",
listOf("bucket_key_1"),
mapOf("k1" to "val1", "k2" to "val2")
)
val aggBucket2 = AggregationResultBucket(
"parent_bucket_path_2",
listOf("bucket_key_2"),
mapOf("k1" to "val1", "k2" to "val2")
)

val actionResultsMap: MutableMap<String, MutableMap<String, ActionRunResult>> = mutableMapOf()
actionResultsMap[aggBucket1.getBucketKeysHash()] = map
actionResultsMap[aggBucket2.getBucketKeysHash()] = map

return BucketLevelTriggerRunResult("trigger-name", null,
mapOf(aggBucket1.getBucketKeysHash() to aggBucket1, aggBucket2.getBucketKeysHash() to aggBucket2), actionResultsMap)
return BucketLevelTriggerRunResult(
"trigger-name", null,
mapOf(
aggBucket1.getBucketKeysHash() to aggBucket1,
aggBucket2.getBucketKeysHash() to aggBucket2
),
actionResultsMap
)
}

fun randomActionRunResult(): ActionRunResult {
Expand All @@ -361,8 +395,15 @@ fun Monitor.toJsonString(): String {
}

fun randomUser(): User {
return User(OpenSearchRestTestCase.randomAlphaOfLength(10), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10),
OpenSearchRestTestCase.randomAlphaOfLength(10)), listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"), listOf("test_attr=test"))
return User(
OpenSearchRestTestCase.randomAlphaOfLength(10),
listOf(
OpenSearchRestTestCase.randomAlphaOfLength(10),
OpenSearchRestTestCase.randomAlphaOfLength(10)
),
listOf(OpenSearchRestTestCase.randomAlphaOfLength(10), "all_access"),
listOf("test_attr=test")
)
}

fun randomUserEmpty(): User {
Expand Down Expand Up @@ -440,7 +481,9 @@ fun parser(xc: String): XContentParser {
}

fun xContentRegistry(): NamedXContentRegistry {
return NamedXContentRegistry(listOf(
SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY) +
SearchModule(Settings.EMPTY, false, emptyList()).namedXContents)
return NamedXContentRegistry(
listOf(
SearchInput.XCONTENT_REGISTRY, QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY
) + SearchModule(Settings.EMPTY, false, emptyList()).namedXContents
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ class BucketSelectorExtAggregationBuilderTests : BasePipelineAggregationTestCase
params["foo"] = "bar"
}
val type = randomFrom(*ScriptType.values())
script =
Script(
type, if (type == ScriptType.STORED) null else
randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG), "script", params
)
script = Script(
type,
if (type == ScriptType.STORED) null else randomFrom("my_lang", Script.DEFAULT_SCRIPT_LANG),
"script", params
)
}
val parentBucketPath = randomAlphaOfLengthBetween(3, 20)
val filter = BucketSelectorExtFilter(IncludeExclude("foo.*", "bar.*"))
Expand Down
Loading