From 5ef7ce41d316552adc302caaf270dbac5f39ff20 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Tue, 20 Jun 2023 08:52:32 -0700 Subject: [PATCH] Adds Chained alerts triggers for workflows (#456) (#457) * adds models for chained alert trigger and chained alerts (#426) Signed-off-by: Surya Sashank Nistala * fix execution id field in alerts (#429) Signed-off-by: Surya Sashank Nistala * accept workflow argument in chained alert constructor (#435) Signed-off-by: Surya Sashank Nistala * add tests for chained alert Signed-off-by: Surya Sashank Nistala --------- Signed-off-by: Surya Sashank Nistala (cherry picked from commit 5ef9b55771d8391f3234544e2aa64c1e379a487d) Co-authored-by: Surya Sashank Nistala Signed-off-by: AWSHurneyt --- .../commons/alerting/model/Alert.kt | 56 +++++-- .../alerting/model/ChainedAlertTrigger.kt | 150 ++++++++++++++++++ .../commons/alerting/model/Trigger.kt | 4 +- .../commons/alerting/model/Workflow.kt | 37 ++++- .../opensearch/commons/alerting/AlertTests.kt | 10 ++ .../alerting/AlertingPluginInterfaceTests.kt | 1 - .../commons/alerting/TestHelpers.kt | 41 +++++ .../alerting/action/GetAlertsResponseTests.kt | 2 +- .../action/GetFindingsResponseTests.kt | 5 +- .../action/IndexWorkflowResponseTests.kt | 16 ++ .../commons/alerting/model/WriteableTests.kt | 11 ++ 11 files changed, 311 insertions(+), 22 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index 3c87011f..dc57fb53 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -38,7 +38,8 @@ data class Alert( val errorHistory: List, val severity: String, val actionExecutionResults: List, - val aggregationResultBucket: AggregationResultBucket? = null + val aggregationResultBucket: AggregationResultBucket? = null, + val executionId: String? = null, ) : Writeable, ToXContent { init { @@ -47,6 +48,24 @@ data class Alert( } } + constructor( + startTime: Instant, + lastNotificationTime: Instant?, + state: State = State.ACTIVE, + errorMessage: String? = null, + schemaVersion: Int = NO_SCHEMA_VERSION, + executionId: String, + chainedAlertTrigger: ChainedAlertTrigger, + workflow: Workflow + ) : this( + monitorId = NO_ID, monitorName = "", monitorVersion = NO_VERSION, monitorUser = workflow.user, + triggerId = chainedAlertTrigger.id, triggerName = chainedAlertTrigger.name, state = state, startTime = startTime, + lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = emptyList(), + severity = chainedAlertTrigger.severity, actionExecutionResults = emptyList(), schemaVersion = schemaVersion, + aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(), + executionId = executionId + ) + constructor( monitor: Monitor, trigger: QueryLevelTrigger, @@ -56,13 +75,15 @@ data class Alert( errorMessage: String? = null, errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), - schemaVersion: Int = NO_SCHEMA_VERSION + schemaVersion: Int = NO_SCHEMA_VERSION, + executionId: String? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList() + aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(), + executionId = executionId ) constructor( @@ -75,13 +96,15 @@ data class Alert( errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), schemaVersion: Int = NO_SCHEMA_VERSION, - findingIds: List = emptyList() + findingIds: List = emptyList(), + executionId: String? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList() + aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList(), + executionId = executionId ) constructor( @@ -95,13 +118,15 @@ data class Alert( actionExecutionResults: List = mutableListOf(), schemaVersion: Int = NO_SCHEMA_VERSION, aggregationResultBucket: AggregationResultBucket, - findingIds: List = emptyList() + findingIds: List = emptyList(), + executionId: String? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList() + aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList(), + executionId = executionId ) constructor( @@ -116,13 +141,15 @@ data class Alert( errorMessage: String? = null, errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), - schemaVersion: Int = NO_SCHEMA_VERSION + schemaVersion: Int = NO_SCHEMA_VERSION, + executionId: String? = null ) : this( id = id, monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds + aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds, + executionId = executionId ) constructor( @@ -171,7 +198,8 @@ data class Alert( errorHistory = sin.readList(::AlertError), severity = sin.readString(), actionExecutionResults = sin.readList(::ActionExecutionResult), - aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null + aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null, + executionId = sin.readOptionalString() ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -205,6 +233,7 @@ data class Alert( } else { out.writeBoolean(false) } + out.writeOptionalString(executionId) } companion object { @@ -229,6 +258,7 @@ data class Alert( const val ALERT_HISTORY_FIELD = "alert_history" const val SEVERITY_FIELD = "severity" const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results" + const val EXECUTION_ID_FIELD = "execution_id" const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val NO_ID = "" @@ -254,6 +284,7 @@ data class Alert( var lastNotificationTime: Instant? = null var acknowledgedTime: Instant? = null var errorMessage: String? = null + var executionId: String? = null val errorHistory: MutableList = mutableListOf() val actionExecutionResults: MutableList = mutableListOf() var aggAlertBucket: AggregationResultBucket? = null @@ -288,6 +319,7 @@ data class Alert( LAST_NOTIFICATION_TIME_FIELD -> lastNotificationTime = xcp.instant() ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant() ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull() + EXECUTION_ID_FIELD -> executionId = xcp.textOrNull() ALERT_HISTORY_FIELD -> { ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { @@ -323,7 +355,7 @@ data class Alert( lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = severity, actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket, findingIds = findingIds, - relatedDocIds = relatedDocIds + relatedDocIds = relatedDocIds, executionId = executionId ) } @@ -349,6 +381,7 @@ data class Alert( .field(SCHEMA_VERSION_FIELD, schemaVersion) .field(MONITOR_VERSION_FIELD, monitorVersion) .field(MONITOR_NAME_FIELD, monitorName) + .field(EXECUTION_ID_FIELD, executionId) if (!secure) { builder.optionalUserField(MONITOR_USER_FIELD, monitorUser) @@ -379,6 +412,7 @@ data class Alert( ALERT_VERSION_FIELD to version, END_TIME_FIELD to endTime?.toEpochMilli(), ERROR_MESSAGE_FIELD to errorMessage, + EXECUTION_ID_FIELD to executionId, LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(), SEVERITY_FIELD to severity, START_TIME_FIELD to startTime.toEpochMilli(), diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt new file mode 100644 index 00000000..debc4ca6 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt @@ -0,0 +1,150 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.UUIDs +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.core.ParseField +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.script.Script +import java.io.IOException + +data class ChainedAlertTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + override val actions: List, + val condition: Script +) : Trigger { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + sin.readList(::Action), // actions + Script(sin) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(CHAINED_ALERT_TRIGGER_FIELD) + .field(ID_FIELD, id) + .field(NAME_FIELD, name) + .field(SEVERITY_FIELD, severity) + .startObject(CONDITION_FIELD) + .field(SCRIPT_FIELD, condition) + .endObject() + .field(ACTIONS_FIELD, actions.toTypedArray()) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return CHAINED_ALERT_TRIGGER_FIELD + } + + /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf( + ID_FIELD to id, + NAME_FIELD to name, + SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() } + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + condition.writeTo(out) + } + + companion object { + const val CHAINED_ALERT_TRIGGER_FIELD = "chained_alert_trigger" + const val CONDITION_FIELD = "condition" + const val SCRIPT_FIELD = "script" + const val QUERY_IDS_FIELD = "query_ids" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Trigger::class.java, + ParseField(CHAINED_ALERT_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): ChainedAlertTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + lateinit var name: String + lateinit var severity: String + lateinit var condition: Script + val actions: MutableList = mutableListOf() + + if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken() + + while (xcp.currentToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + + xcp.nextToken() + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_FIELD -> { + xcp.nextToken() + condition = Script.parse(xcp) + require(condition.lang == Script.DEFAULT_SCRIPT_LANG) { + "Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]" + } + xcp.nextToken() + } + ACTIONS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + } + xcp.nextToken() + } + + return ChainedAlertTrigger( + name = requireNotNull(name) { "Trigger name is null" }, + severity = requireNotNull(severity) { "Trigger severity is null" }, + condition = requireNotNull(condition) { "Trigger condition is null" }, + actions = requireNotNull(actions) { "Trigger actions are null" }, + id = requireNotNull(id) { "Trigger id is null." } + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ChainedAlertTrigger { + return ChainedAlertTrigger(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index 254b6401..fdc7ae80 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -13,7 +13,8 @@ interface Trigger : BaseModel { DOCUMENT_LEVEL_TRIGGER(DocumentLevelTrigger.DOCUMENT_LEVEL_TRIGGER_FIELD), QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD), BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD), - NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD); + NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD), + CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD); override fun toString(): String { return value @@ -53,6 +54,7 @@ interface Trigger : BaseModel { Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin) Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin) Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) + Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt index fd563cc8..d2f2518d 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt @@ -35,7 +35,8 @@ data class Workflow( val user: User?, val schemaVersion: Int = NO_SCHEMA_VERSION, val inputs: List, - val owner: String? = DEFAULT_OWNER + val owner: String? = DEFAULT_OWNER, + val triggers: List ) : ScheduledJob { override val type = WORKFLOW_TYPE @@ -46,6 +47,11 @@ data class Workflow( require(enabledTime == null) } require(inputs.size <= WORKFLOW_MAX_INPUTS) { "Workflows can only have $WORKFLOW_MAX_INPUTS search input." } + triggers.forEach { trigger -> + run { + require(trigger is ChainedAlertTrigger) { "Incompatible trigger [${trigger.name}] for workflow. " } + } + } } @Throws(IOException::class) @@ -63,7 +69,8 @@ data class Workflow( } else null, schemaVersion = sin.readInt(), inputs = sin.readList((WorkflowInput)::readFrom), - owner = sin.readOptionalString() + owner = sin.readOptionalString(), + triggers = sin.readList((Trigger)::readFrom) ) // This enum classifies different workflows @@ -109,6 +116,7 @@ data class Workflow( .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) .field(SCHEDULE_FIELD, schedule) .field(INPUTS_FIELD, inputs.toTypedArray()) + .field(TRIGGERS_FIELD, triggers.toTypedArray()) .optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) builder.field(OWNER_FIELD, owner) if (params.paramAsBoolean("with_type", false)) builder.endObject() @@ -143,11 +151,17 @@ data class Workflow( } // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it out.writeOptionalString(owner) + out.writeVInt(triggers.size) + triggers.forEach { + when (it) { + is ChainedAlertTrigger -> out.writeEnum(Trigger.Type.CHAINED_ALERT_TRIGGER) + else -> throw IOException("Unsupported trigger type for workflow") + } + it.writeTo(out) + } } companion object { - const val WORKFLOW_DELEGATE_PATH = "workflow.inputs.composite_input.sequence.delegates" - const val WORKFLOW_MONITOR_PATH = "workflow.inputs.composite_input.sequence.delegates.monitor_id" const val WORKFLOW_TYPE = "workflow" const val TYPE_FIELD = "type" const val WORKFLOW_TYPE_FIELD = "workflow_type" @@ -161,6 +175,7 @@ data class Workflow( const val INPUTS_FIELD = "inputs" const val LAST_UPDATE_TIME_FIELD = "last_update_time" const val ENABLED_TIME_FIELD = "enabled_time" + const val TRIGGERS_FIELD = "triggers" const val OWNER_FIELD = "owner" // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all @@ -184,6 +199,7 @@ data class Workflow( var enabled = true var schemaVersion = NO_SCHEMA_VERSION val inputs: MutableList = mutableListOf() + val triggers: MutableList = mutableListOf() var owner = DEFAULT_OWNER XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -206,6 +222,16 @@ data class Workflow( } ENABLED_FIELD -> enabled = xcp.booleanValue() SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) + Monitor.TRIGGERS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + triggers.add(Trigger.parse(xcp)) + } + } INPUTS_FIELD -> { XContentParserUtils.ensureExpectedToken( XContentParser.Token.START_ARRAY, @@ -245,7 +271,8 @@ data class Workflow( user, schemaVersion, inputs.toList(), - owner + owner, + triggers ) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt index 23e47825..29b32bb5 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt @@ -4,6 +4,7 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import org.opensearch.commons.alerting.model.Alert +import kotlin.test.assertTrue class AlertTests { @Test @@ -58,4 +59,13 @@ class AlertTests { val activeAlert = randomAlert().copy(state = Alert.State.ACTIVE) Assertions.assertFalse(activeAlert.isAcknowledged(), "Alert is acknowledged") } + + @Test + fun `test chained alert`() { + val workflow = randomWorkflow() + val trigger = randomChainedAlertTrigger() + val alert = randomChainedAlert(workflow = workflow, trigger = trigger) + assertEquals(alert.monitorId, "") + assertEquals(alert.id, "") + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 781097e4..69eb756c 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -74,7 +74,6 @@ internal class AlertingPluginInterfaceTests { ) val listener: ActionListener = mock(ActionListener::class.java) as ActionListener - val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) Mockito.doAnswer { (it.getArgument(2) as ActionListener) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index b7f463ed..b6721a0f 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -21,6 +21,7 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput import org.opensearch.commons.alerting.model.CompositeInput @@ -63,6 +64,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Random +import java.util.UUID const val ALL_ACCESS_ROLE = "all_access" @@ -170,6 +172,7 @@ fun randomWorkflow( enabled: Boolean = Random().nextBoolean(), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + triggers: List = listOf(randomChainedAlertTrigger()), ): Workflow { val delegates = mutableListOf() if (!monitorIds.isNullOrEmpty()) { @@ -192,6 +195,7 @@ fun randomWorkflow( return Workflow( name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + triggers = triggers ) } @@ -203,10 +207,12 @@ fun randomWorkflowWithDelegates( enabled: Boolean = Random().nextBoolean(), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomChainedAlertTrigger() }, ): Workflow { return Workflow( name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + triggers = triggers ) } @@ -285,6 +291,25 @@ fun randomDocumentLevelTrigger( ) } +fun randomChainedAlertTrigger( + id: String = UUIDs.base64UUID(), + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + severity: String = "1", + condition: Script = randomScript(), + actions: List = mutableListOf(), + destinationId: String = "" +): ChainedAlertTrigger { + return ChainedAlertTrigger( + id = id, + name = name, + severity = severity, + condition = condition, + actions = if (actions.isEmpty() && destinationId.isNotBlank()) { + (0..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomAction(destinationId = destinationId) } + } else actions + ) +} + fun randomBucketSelectorExtAggregationBuilder( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), @@ -470,6 +495,7 @@ fun xContentRegistry(): NamedXContentRegistry { QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY, + ChainedAlertTrigger.XCONTENT_REGISTRY, NoOpTrigger.XCONTENT_REGISTRY ) + SearchModule(Settings.EMPTY, emptyList()).namedXContents ) @@ -496,6 +522,21 @@ fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert { ) } +fun randomChainedAlert( + workflow: Workflow = randomWorkflow(), + trigger: ChainedAlertTrigger = randomChainedAlertTrigger(), +): Alert { + return Alert( + startTime = Instant.now(), + lastNotificationTime = Instant.now(), + state = Alert.State.ACTIVE, + errorMessage = null, + executionId = UUID.randomUUID().toString(), + chainedAlertTrigger = trigger, + workflow = workflow + ) +} + fun randomActionExecutionResult( actionId: String = UUIDs.base64UUID(), lastExecutionTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponseTests.kt index 5d5b2364..f04745d3 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetAlertsResponseTests.kt @@ -97,7 +97,7 @@ class GetAlertsResponseTests { var actualXContentString = req.toXContent(builder(), ToXContent.EMPTY_PARAMS).string() val expectedXContentString = "{\"alerts\":[{\"id\":\"id\",\"version\":0,\"monitor_id\":\"monitorId\"," + "\"schema_version\":0,\"monitor_version\":0,\"monitor_name\":\"monitorName\"," + - "\"trigger_id\":\"triggerId\",\"trigger_name\":\"triggerName\"," + + "\"execution_id\":null,\"trigger_id\":\"triggerId\",\"trigger_name\":\"triggerName\"," + "\"finding_ids\":[],\"related_doc_ids\":[],\"state\":\"ACKNOWLEDGED\",\"error_message\":null,\"alert_history\":[]," + "\"severity\":\"severity\",\"action_execution_results\":[],\"start_time\":" + now.toEpochMilli() + ",\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null}],\"totalAlerts\":1}" diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt index 7349078b..e5b391bb 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt @@ -10,7 +10,6 @@ import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs import org.opensearch.rest.RestStatus import java.time.Instant -import java.util.List internal class GetFindingsResponseTests { @@ -25,7 +24,7 @@ internal class GetFindingsResponseTests { "monitor_id1", "monitor_name1", "test_index1", - listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", List.of())), + listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", listOf())), Instant.now() ) val findingDocument1 = FindingDocument("test_index1", "doc1", true, "document 1 payload") @@ -44,7 +43,7 @@ internal class GetFindingsResponseTests { "monitor_id2", "monitor_name2", "test_index2", - listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", List.of())), + listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", listOf())), Instant.now() ) val findingDocument21 = FindingDocument("test_index2", "doc21", true, "document 21 payload") diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt index 523f5650..8565d093 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt @@ -4,8 +4,10 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput +import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.CronSchedule import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.randomChainedAlertTrigger import org.opensearch.commons.alerting.randomUser import java.time.Instant import java.time.ZoneId @@ -30,6 +32,7 @@ class IndexWorkflowResponseTests { user = randomUser(), schemaVersion = 0, inputs = mutableListOf(), + triggers = listOf(randomChainedAlertTrigger()) ) val req = IndexWorkflowResponse("1234", 1L, 2L, 0L, workflow) Assertions.assertNotNull(req) @@ -41,5 +44,18 @@ class IndexWorkflowResponseTests { Assertions.assertEquals("1234", newReq.id) Assertions.assertEquals(1L, newReq.version) Assertions.assertNotNull(newReq.workflow) + Assertions.assertEquals(newReq.workflow.triggers.size, 1) + Assertions.assertEquals(newReq.workflow.triggers.get(0).name, req.workflow.triggers.get(0).name) + Assertions.assertEquals(newReq.workflow.triggers.get(0).id, req.workflow.triggers.get(0).id) + Assertions.assertEquals(newReq.workflow.triggers.get(0).severity, req.workflow.triggers.get(0).severity) + Assertions.assertEquals( + (newReq.workflow.triggers.get(0) as ChainedAlertTrigger).condition.idOrCode, + (req.workflow.triggers.get(0) as ChainedAlertTrigger).condition.idOrCode + ) + + Assertions.assertEquals( + (newReq.workflow.triggers.get(0) as ChainedAlertTrigger).condition.lang, + (req.workflow.triggers.get(0) as ChainedAlertTrigger).condition.lang + ) } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 9f5e26b9..a3e83026 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -10,6 +10,7 @@ import org.opensearch.commons.alerting.model.action.Throttle import org.opensearch.commons.alerting.randomAction import org.opensearch.commons.alerting.randomActionExecutionPolicy import org.opensearch.commons.alerting.randomBucketLevelTrigger +import org.opensearch.commons.alerting.randomChainedAlertTrigger import org.opensearch.commons.alerting.randomDocumentLevelTrigger import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -111,6 +112,16 @@ class WriteableTests { Assertions.assertEquals(trigger, newTrigger, "Round tripping DocumentLevelTrigger doesn't work") } + @Test + fun `test chained alert trigger as stream`() { + val trigger = randomChainedAlertTrigger() + val out = BytesStreamOutput() + trigger.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newTrigger = ChainedAlertTrigger.readFrom(sin) + Assertions.assertEquals(trigger, newTrigger, "Round tripping DocumentLevelTrigger doesn't work") + } + @Test fun `test searchinput as stream`() { val input = SearchInput(emptyList(), SearchSourceBuilder())