Skip to content

Commit

Permalink
Adds Chained alerts triggers for workflows (opensearch-project#456)
Browse files Browse the repository at this point in the history
* adds models for chained alert trigger and chained alerts (opensearch-project#426)

Signed-off-by: Surya Sashank Nistala <[email protected]>

* fix execution id field in alerts (opensearch-project#429)

Signed-off-by: Surya Sashank Nistala <[email protected]>

* accept workflow argument in chained alert constructor (opensearch-project#435)

Signed-off-by: Surya Sashank Nistala <[email protected]>

* add tests for chained alert

Signed-off-by: Surya Sashank Nistala <[email protected]>

---------

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep authored Jun 20, 2023
1 parent 1484bff commit 5ef9b55
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 22 deletions.
56 changes: 45 additions & 11 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ data class Alert(
val errorHistory: List<AlertError>,
val severity: String,
val actionExecutionResults: List<ActionExecutionResult>,
val aggregationResultBucket: AggregationResultBucket? = null
val aggregationResultBucket: AggregationResultBucket? = null,
val executionId: String? = null,
) : Writeable, ToXContent {

init {
Expand All @@ -47,6 +48,24 @@ data class Alert(
}
}

constructor(
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String,
chainedAlertTrigger: ChainedAlertTrigger,
workflow: Workflow
) : this(
monitorId = NO_ID, monitorName = "", monitorVersion = NO_VERSION, monitorUser = workflow.user,
triggerId = chainedAlertTrigger.id, triggerName = chainedAlertTrigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = emptyList(),
severity = chainedAlertTrigger.severity, actionExecutionResults = emptyList(), schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand All @@ -56,13 +75,15 @@ data class Alert(
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList()
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
Expand All @@ -75,13 +96,15 @@ data class Alert(
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
findingIds: List<String> = emptyList()
findingIds: List<String> = emptyList(),
executionId: String? = null
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList()
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
Expand All @@ -95,13 +118,15 @@ data class Alert(
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
aggregationResultBucket: AggregationResultBucket,
findingIds: List<String> = emptyList()
findingIds: List<String> = emptyList(),
executionId: String? = null
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList()
aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
Expand All @@ -116,13 +141,15 @@ data class Alert(
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null
) : this(
id = id, monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds,
executionId = executionId
)

constructor(
Expand Down Expand Up @@ -171,7 +198,8 @@ data class Alert(
errorHistory = sin.readList(::AlertError),
severity = sin.readString(),
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
executionId = sin.readOptionalString()
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand Down Expand Up @@ -205,6 +233,7 @@ data class Alert(
} else {
out.writeBoolean(false)
}
out.writeOptionalString(executionId)
}

companion object {
Expand All @@ -229,6 +258,7 @@ data class Alert(
const val ALERT_HISTORY_FIELD = "alert_history"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"
const val EXECUTION_ID_FIELD = "execution_id"
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val NO_ID = ""
Expand All @@ -254,6 +284,7 @@ data class Alert(
var lastNotificationTime: Instant? = null
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
var executionId: String? = null
val errorHistory: MutableList<AlertError> = mutableListOf()
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
Expand Down Expand Up @@ -288,6 +319,7 @@ data class Alert(
LAST_NOTIFICATION_TIME_FIELD -> lastNotificationTime = xcp.instant()
ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant()
ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull()
EXECUTION_ID_FIELD -> executionId = xcp.textOrNull()
ALERT_HISTORY_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
Expand Down Expand Up @@ -323,7 +355,7 @@ data class Alert(
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket, findingIds = findingIds,
relatedDocIds = relatedDocIds
relatedDocIds = relatedDocIds, executionId = executionId
)
}

Expand All @@ -349,6 +381,7 @@ data class Alert(
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(MONITOR_VERSION_FIELD, monitorVersion)
.field(MONITOR_NAME_FIELD, monitorName)
.field(EXECUTION_ID_FIELD, executionId)

if (!secure) {
builder.optionalUserField(MONITOR_USER_FIELD, monitorUser)
Expand Down Expand Up @@ -379,6 +412,7 @@ data class Alert(
ALERT_VERSION_FIELD to version,
END_TIME_FIELD to endTime?.toEpochMilli(),
ERROR_MESSAGE_FIELD to errorMessage,
EXECUTION_ID_FIELD to executionId,
LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(),
SEVERITY_FIELD to severity,
START_TIME_FIELD to startTime.toEpochMilli(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.CheckedFunction
import org.opensearch.common.UUIDs
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD
import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD
import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD
import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.core.ParseField
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.script.Script
import java.io.IOException

data class ChainedAlertTrigger(
override val id: String = UUIDs.base64UUID(),
override val name: String,
override val severity: String,
override val actions: List<Action>,
val condition: Script
) : Trigger {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readString(), // name
sin.readString(), // severity
sin.readList(::Action), // actions
Script(sin)
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.startObject(CHAINED_ALERT_TRIGGER_FIELD)
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(SEVERITY_FIELD, severity)
.startObject(CONDITION_FIELD)
.field(SCRIPT_FIELD, condition)
.endObject()
.field(ACTIONS_FIELD, actions.toTypedArray())
.endObject()
.endObject()
return builder
}

override fun name(): String {
return CHAINED_ALERT_TRIGGER_FIELD
}

/** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */
fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
SEVERITY_FIELD to severity,
ACTIONS_FIELD to actions.map { it.asTemplateArg() }
)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
out.writeString(severity)
out.writeCollection(actions)
condition.writeTo(out)
}

companion object {
const val CHAINED_ALERT_TRIGGER_FIELD = "chained_alert_trigger"
const val CONDITION_FIELD = "condition"
const val SCRIPT_FIELD = "script"
const val QUERY_IDS_FIELD = "query_ids"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
Trigger::class.java,
ParseField(CHAINED_ALERT_TRIGGER_FIELD),
CheckedFunction { parseInner(it) }
)

@JvmStatic
@Throws(IOException::class)
fun parseInner(xcp: XContentParser): ChainedAlertTrigger {
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
lateinit var name: String
lateinit var severity: String
lateinit var condition: Script
val actions: MutableList<Action> = mutableListOf()

if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) {
XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation)
}

// If the parser began on START_OBJECT, move to the next token so that the while loop enters on
// the fieldName (or END_OBJECT if it's empty).
if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken()

while (xcp.currentToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()

xcp.nextToken()
when (fieldName) {
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.text()
SEVERITY_FIELD -> severity = xcp.text()
CONDITION_FIELD -> {
xcp.nextToken()
condition = Script.parse(xcp)
require(condition.lang == Script.DEFAULT_SCRIPT_LANG) {
"Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]"
}
xcp.nextToken()
}
ACTIONS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
}
}
xcp.nextToken()
}

return ChainedAlertTrigger(
name = requireNotNull(name) { "Trigger name is null" },
severity = requireNotNull(severity) { "Trigger severity is null" },
condition = requireNotNull(condition) { "Trigger condition is null" },
actions = requireNotNull(actions) { "Trigger actions are null" },
id = requireNotNull(id) { "Trigger id is null." }
)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): ChainedAlertTrigger {
return ChainedAlertTrigger(sin)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ interface Trigger : BaseModel {
DOCUMENT_LEVEL_TRIGGER(DocumentLevelTrigger.DOCUMENT_LEVEL_TRIGGER_FIELD),
QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD),
BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD),
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD);
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD),
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD);

override fun toString(): String {
return value
Expand Down Expand Up @@ -53,6 +54,7 @@ interface Trigger : BaseModel {
Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin)
Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin)
Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin)
Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Loading

0 comments on commit 5ef9b55

Please sign in to comment.