Skip to content

Commit

Permalink
Refactor Monitor and Trigger to split into Query-Level and Bucket-Lev… (
Browse files Browse the repository at this point in the history
#150)

* Refactor Monitor and Trigger to split into Query-Level and Bucket-Level Monitors

Signed-off-by: Mohammad Qureshi <[email protected]>

* Require condition to not be null when parsing Bucket-Level Trigger

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Aug 25, 2021
1 parent ecd283f commit e8c474f
Show file tree
Hide file tree
Showing 38 changed files with 1,579 additions and 460 deletions.
13 changes: 10 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteDestinationAction
import org.opensearch.alerting.resthandler.RestDeleteEmailAccountAction
Expand Down Expand Up @@ -135,8 +137,8 @@ import java.util.function.Supplier
/**
* Entry point of the OpenDistro for Elasticsearch alerting plugin
* This class initializes the [RestGetMonitorAction], [RestDeleteMonitorAction], [RestIndexMonitorAction] rest handlers.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY] to the
* [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY],
* [BucketLevelTrigger.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {

Expand Down Expand Up @@ -224,7 +226,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(Monitor.XCONTENT_REGISTRY, SearchInput.XCONTENT_REGISTRY)
return listOf(
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
)
}

override fun createComponents(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import java.io.IOException

class ExecuteMonitorResponse : ActionResponse, ToXContentObject {

val monitorRunResult: MonitorRunResult
val monitorRunResult: MonitorRunResult<*>

constructor(monitorRunResult: MonitorRunResult) : super() {
constructor(monitorRunResult: MonitorRunResult<*>) : super() {
this.monitorRunResult = monitorRunResult
}

Expand Down
91 changes: 73 additions & 18 deletions alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.alerting.elasticapi.instant
import org.opensearch.alerting.elasticapi.optionalTimeField
import org.opensearch.alerting.elasticapi.optionalUserField
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.authuser.User
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
Expand All @@ -39,7 +40,6 @@ import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import java.io.IOException
import java.time.Instant

Expand All @@ -61,7 +61,8 @@ data class Alert(
val errorMessage: String? = null,
val errorHistory: List<AlertError>,
val severity: String,
val actionExecutionResults: List<ActionExecutionResult>
val actionExecutionResults: List<ActionExecutionResult>,
val aggregationResultBucket: AggregationResultBucket? = null
) : Writeable, ToXContent {

init {
Expand All @@ -72,20 +73,52 @@ data class Alert(

constructor(
monitor: Monitor,
trigger: Trigger,
trigger: QueryLevelTrigger,
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion
)
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)

constructor(
monitor: Monitor,
trigger: BucketLevelTrigger,
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null)

constructor(
monitor: Monitor,
trigger: BucketLevelTrigger,
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
aggregationResultBucket: AggregationResultBucket
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = aggregationResultBucket)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand All @@ -112,7 +145,8 @@ data class Alert(
errorMessage = sin.readOptionalString(),
errorHistory = sin.readList(::AlertError),
severity = sin.readString(),
actionExecutionResults = sin.readList(::ActionExecutionResult)
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand All @@ -138,6 +172,12 @@ data class Alert(
out.writeCollection(errorHistory)
out.writeString(severity)
out.writeCollection(actionExecutionResults)
if (aggregationResultBucket != null) {
out.writeBoolean(true)
aggregationResultBucket.writeTo(out)
} else {
out.writeBoolean(false)
}
}

companion object {
Expand All @@ -160,7 +200,8 @@ data class Alert(
const val ALERT_HISTORY_FIELD = "alert_history"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"

const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

Expand All @@ -183,8 +224,8 @@ data class Alert(
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
val errorHistory: MutableList<AlertError> = mutableListOf()
var actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()

val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -217,18 +258,27 @@ data class Alert(
actionExecutionResults.add(ActionExecutionResult.parse(xcp))
}
}
AggregationResultBucket.CONFIG_NAME -> {
// If an Alert with aggAlertBucket contents is indexed into the alerts index first, then
// that field will be added to the mappings.
// In this case, that field will default to null when it isn't present for Alerts created by Query-Level Monitors
// (even though the toXContent doesn't output the field) so null is being accounted for here.
aggAlertBucket = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) {
null
} else {
AggregationResultBucket.parse(xcp)
}
}
}
}

return Alert(
id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
return Alert(id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
monitorName = requireNotNull(monitorName), monitorVersion = monitorVersion, monitorUser = monitorUser,
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults
)
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket)
}

@JvmStatic
Expand All @@ -239,7 +289,7 @@ data class Alert(
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
builder.startObject()
.field(ALERT_ID_FIELD, id)
.field(ALERT_VERSION_FIELD, version)
.field(MONITOR_ID_FIELD, monitorId)
Expand All @@ -258,7 +308,9 @@ data class Alert(
.optionalTimeField(LAST_NOTIFICATION_TIME_FIELD, lastNotificationTime)
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
.endObject()
aggregationResultBucket?.innerXContent(builder)
builder.endObject()
return builder
}

fun asTemplateArg(): Map<String, Any?> {
Expand All @@ -271,7 +323,10 @@ data class Alert(
LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(),
SEVERITY_FIELD to severity,
START_TIME_FIELD to startTime.toEpochMilli(),
STATE_FIELD to state.toString()
STATE_FIELD to state.toString(),
// Converting bucket keys to comma separated String to avoid manipulation in Action mustache templates
BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","),
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting.model

import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.alerting.model.Trigger.Companion.ACTIONS_FIELD
import org.opensearch.alerting.model.Trigger.Companion.ID_FIELD
import org.opensearch.alerting.model.Trigger.Companion.NAME_FIELD
import org.opensearch.alerting.model.Trigger.Companion.SEVERITY_FIELD
import org.opensearch.alerting.model.action.Action
import org.opensearch.common.CheckedFunction
import org.opensearch.common.ParseField
import org.opensearch.common.UUIDs
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParser.Token
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import java.io.IOException

/**
* A multi-alert Trigger available with Bucket-Level Monitors that filters aggregation buckets via a pipeline
* aggregator.
*/
data class BucketLevelTrigger(
override val id: String = UUIDs.base64UUID(),
override val name: String,
override val severity: String,
val bucketSelector: BucketSelectorExtAggregationBuilder,
override val actions: List<Action>
) : Trigger {

@Throws(IOException::class)
constructor(sin: StreamInput): this(
sin.readString(), // id
sin.readString(), // name
sin.readString(), // severity
BucketSelectorExtAggregationBuilder(sin), // condition
sin.readList(::Action) // actions
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.startObject(BUCKET_LEVEL_TRIGGER_FIELD)
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(SEVERITY_FIELD, severity)
.startObject(CONDITION_FIELD)
bucketSelector.internalXContent(builder, params)
builder.endObject()
.field(ACTIONS_FIELD, actions.toTypedArray())
.endObject()
.endObject()
return builder
}

override fun name(): String {
return BUCKET_LEVEL_TRIGGER_FIELD
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
out.writeString(severity)
bucketSelector.writeTo(out)
out.writeCollection(actions)
}

fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
SEVERITY_FIELD to severity,
ACTIONS_FIELD to actions.map { it.asTemplateArg() },
PARENT_BUCKET_PATH to getParentBucketPath()
)
}

fun getParentBucketPath(): String {
return bucketSelector.parentBucketPath
}

companion object {
const val BUCKET_LEVEL_TRIGGER_FIELD = "bucket_level_trigger"
const val CONDITION_FIELD = "condition"
const val PARENT_BUCKET_PATH = "parentBucketPath"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Trigger::class.java, ParseField(BUCKET_LEVEL_TRIGGER_FIELD),
CheckedFunction { parseInner(it) })

@JvmStatic
@Throws(IOException::class)
fun parseInner(xcp: XContentParser): BucketLevelTrigger {
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
lateinit var name: String
lateinit var severity: String
val actions: MutableList<Action> = mutableListOf()
ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
lateinit var bucketSelector: BucketSelectorExtAggregationBuilder

while (xcp.nextToken() != Token.END_OBJECT) {
val fieldName = xcp.currentName()

xcp.nextToken()
when (fieldName) {
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.text()
SEVERITY_FIELD -> severity = xcp.text()
CONDITION_FIELD -> {
// Using the trigger id as the name in the bucket selector since it is validated for uniqueness within Monitors.
// The contents of the trigger definition are round-tripped through parse and toXContent during Monitor creation
// ensuring that the id is available here in the version of the Monitor object that will be executed, even if the
// user submitted a custom trigger id after the condition definition.
bucketSelector = BucketSelectorExtAggregationBuilder.parse(id, xcp)
}
ACTIONS_FIELD -> {
ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
}
}
}

return BucketLevelTrigger(
id = requireNotNull(id) { "Trigger id is null." },
name = requireNotNull(name) { "Trigger name is null" },
severity = requireNotNull(severity) { "Trigger severity is null" },
bucketSelector = requireNotNull(bucketSelector) { "Trigger condition is null" },
actions = requireNotNull(actions) { "Trigger actions are null" })
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): BucketLevelTrigger {
return BucketLevelTrigger(sin)
}
}
}
Loading

0 comments on commit e8c474f

Please sign in to comment.