Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Chained alerts triggers for workflows #456

Merged
merged 4 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ data class Alert(
val errorHistory: List<AlertError>,
val severity: String,
val actionExecutionResults: List<ActionExecutionResult>,
val aggregationResultBucket: AggregationResultBucket? = null
val aggregationResultBucket: AggregationResultBucket? = null,
val executionId: String? = null,
) : Writeable, ToXContent {

init {
Expand All @@ -47,6 +48,24 @@ data class Alert(
}
}

constructor(
startTime: Instant,
lastNotificationTime: Instant?,
state: State = State.ACTIVE,
errorMessage: String? = null,
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String,
chainedAlertTrigger: ChainedAlertTrigger,
workflow: Workflow
) : this(
monitorId = NO_ID, monitorName = "", monitorVersion = NO_VERSION, monitorUser = workflow.user,
triggerId = chainedAlertTrigger.id, triggerName = chainedAlertTrigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = emptyList(),
severity = chainedAlertTrigger.severity, actionExecutionResults = emptyList(), schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
monitor: Monitor,
trigger: QueryLevelTrigger,
Expand All @@ -56,13 +75,15 @@ data class Alert(
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList()
aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
Expand All @@ -75,13 +96,15 @@ data class Alert(
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
findingIds: List<String> = emptyList()
findingIds: List<String> = emptyList(),
executionId: String? = null
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList()
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
Expand All @@ -95,13 +118,15 @@ data class Alert(
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION,
aggregationResultBucket: AggregationResultBucket,
findingIds: List<String> = emptyList()
findingIds: List<String> = emptyList(),
executionId: String? = null
) : this(
monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList()
aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList(),
executionId = executionId
)

constructor(
Expand All @@ -116,13 +141,15 @@ data class Alert(
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null
) : this(
id = id, monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion,
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds
aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds,
executionId = executionId
)

constructor(
Expand Down Expand Up @@ -171,7 +198,8 @@ data class Alert(
errorHistory = sin.readList(::AlertError),
severity = sin.readString(),
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
executionId = sin.readOptionalString()
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand Down Expand Up @@ -205,6 +233,7 @@ data class Alert(
} else {
out.writeBoolean(false)
}
out.writeOptionalString(executionId)
}

companion object {
Expand All @@ -229,6 +258,7 @@ data class Alert(
const val ALERT_HISTORY_FIELD = "alert_history"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results"
const val EXECUTION_ID_FIELD = "execution_id"
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val NO_ID = ""
Expand All @@ -254,6 +284,7 @@ data class Alert(
var lastNotificationTime: Instant? = null
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
var executionId: String? = null
val errorHistory: MutableList<AlertError> = mutableListOf()
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
Expand Down Expand Up @@ -288,6 +319,7 @@ data class Alert(
LAST_NOTIFICATION_TIME_FIELD -> lastNotificationTime = xcp.instant()
ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant()
ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull()
EXECUTION_ID_FIELD -> executionId = xcp.textOrNull()
ALERT_HISTORY_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
Expand Down Expand Up @@ -323,7 +355,7 @@ data class Alert(
lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage, errorHistory = errorHistory, severity = severity,
actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket, findingIds = findingIds,
relatedDocIds = relatedDocIds
relatedDocIds = relatedDocIds, executionId = executionId
)
}

Expand All @@ -349,6 +381,7 @@ data class Alert(
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(MONITOR_VERSION_FIELD, monitorVersion)
.field(MONITOR_NAME_FIELD, monitorName)
.field(EXECUTION_ID_FIELD, executionId)

if (!secure) {
builder.optionalUserField(MONITOR_USER_FIELD, monitorUser)
Expand Down Expand Up @@ -379,6 +412,7 @@ data class Alert(
ALERT_VERSION_FIELD to version,
END_TIME_FIELD to endTime?.toEpochMilli(),
ERROR_MESSAGE_FIELD to errorMessage,
EXECUTION_ID_FIELD to executionId,
LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(),
SEVERITY_FIELD to severity,
START_TIME_FIELD to startTime.toEpochMilli(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.CheckedFunction
import org.opensearch.common.UUIDs
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD
import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD
import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD
import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.core.ParseField
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.script.Script
import java.io.IOException

data class ChainedAlertTrigger(
override val id: String = UUIDs.base64UUID(),
override val name: String,
override val severity: String,
override val actions: List<Action>,
val condition: Script
) : Trigger {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readString(), // name
sin.readString(), // severity
sin.readList(::Action), // actions
Script(sin)
)

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.startObject(CHAINED_ALERT_TRIGGER_FIELD)
.field(ID_FIELD, id)
.field(NAME_FIELD, name)
.field(SEVERITY_FIELD, severity)
.startObject(CONDITION_FIELD)
.field(SCRIPT_FIELD, condition)
.endObject()
.field(ACTIONS_FIELD, actions.toTypedArray())
.endObject()
.endObject()
return builder
}

override fun name(): String {
return CHAINED_ALERT_TRIGGER_FIELD
}

/** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */
fun asTemplateArg(): Map<String, Any> {
return mapOf(
ID_FIELD to id,
NAME_FIELD to name,
SEVERITY_FIELD to severity,
ACTIONS_FIELD to actions.map { it.asTemplateArg() }
)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
out.writeString(severity)
out.writeCollection(actions)
condition.writeTo(out)
}

companion object {
const val CHAINED_ALERT_TRIGGER_FIELD = "chained_alert_trigger"
const val CONDITION_FIELD = "condition"
const val SCRIPT_FIELD = "script"
const val QUERY_IDS_FIELD = "query_ids"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
Trigger::class.java,
ParseField(CHAINED_ALERT_TRIGGER_FIELD),
CheckedFunction { parseInner(it) }
)

@JvmStatic
@Throws(IOException::class)
fun parseInner(xcp: XContentParser): ChainedAlertTrigger {
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
lateinit var name: String
lateinit var severity: String
lateinit var condition: Script
val actions: MutableList<Action> = mutableListOf()

if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) {
XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation)
}

// If the parser began on START_OBJECT, move to the next token so that the while loop enters on
// the fieldName (or END_OBJECT if it's empty).
if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken()

while (xcp.currentToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()

xcp.nextToken()
when (fieldName) {
ID_FIELD -> id = xcp.text()
NAME_FIELD -> name = xcp.text()
SEVERITY_FIELD -> severity = xcp.text()
CONDITION_FIELD -> {
xcp.nextToken()
condition = Script.parse(xcp)
require(condition.lang == Script.DEFAULT_SCRIPT_LANG) {
"Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]"
}
xcp.nextToken()
}
ACTIONS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
}
}
xcp.nextToken()
}

return ChainedAlertTrigger(
name = requireNotNull(name) { "Trigger name is null" },
severity = requireNotNull(severity) { "Trigger severity is null" },
condition = requireNotNull(condition) { "Trigger condition is null" },
actions = requireNotNull(actions) { "Trigger actions are null" },
id = requireNotNull(id) { "Trigger id is null." }
)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): ChainedAlertTrigger {
return ChainedAlertTrigger(sin)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ interface Trigger : BaseModel {
DOCUMENT_LEVEL_TRIGGER(DocumentLevelTrigger.DOCUMENT_LEVEL_TRIGGER_FIELD),
QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD),
BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD),
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD);
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD),
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD);

override fun toString(): String {
return value
Expand Down Expand Up @@ -53,6 +54,7 @@ interface Trigger : BaseModel {
Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin)
Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin)
Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin)
Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Loading