From d1044b2f5bf39c94c8fc69055d2bd0f5a6f4624d Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Tue, 9 May 2023 14:42:44 -0700 Subject: [PATCH] adds models for chained alert trigger and chained alerts (#426) Signed-off-by: Surya Sashank Nistala --- .../commons/alerting/model/Alert.kt | 56 ++++-- .../alerting/model/ChainedAlertTrigger.kt | 161 ++++++++++++++++++ .../commons/alerting/model/Trigger.kt | 4 +- .../commons/alerting/model/Workflow.kt | 37 +++- .../alerting/AlertingPluginInterfaceTests.kt | 1 - .../commons/alerting/TestHelpers.kt | 24 +++ .../action/GetFindingsResponseTests.kt | 5 +- .../action/IndexWorkflowResponseTests.kt | 16 ++ 8 files changed, 283 insertions(+), 21 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt index 032c484a..c965778c 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt @@ -38,7 +38,8 @@ data class Alert( val errorHistory: List, val severity: String, val actionExecutionResults: List, - val aggregationResultBucket: AggregationResultBucket? = null + val aggregationResultBucket: AggregationResultBucket? = null, + val workflowExecutionId: String? = null, ) : Writeable, ToXContent { init { @@ -47,6 +48,24 @@ data class Alert( } } + constructor( + startTime: Instant, + lastNotificationTime: Instant?, + state: State = State.ACTIVE, + errorMessage: String? = null, + schemaVersion: Int = NO_SCHEMA_VERSION, + workflowExecutionId: String, + chainedAlertTrigger: ChainedAlertTrigger, + user: User + ) : this( + monitorId = NO_ID, monitorName = "", monitorVersion = NO_VERSION, monitorUser = user, + triggerId = chainedAlertTrigger.id, triggerName = chainedAlertTrigger.name, state = state, startTime = startTime, + lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = emptyList(), + severity = chainedAlertTrigger.severity, actionExecutionResults = emptyList(), schemaVersion = schemaVersion, + aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(), + workflowExecutionId = workflowExecutionId + ) + constructor( monitor: Monitor, trigger: QueryLevelTrigger, @@ -56,13 +75,15 @@ data class Alert( errorMessage: String? = null, errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), - schemaVersion: Int = NO_SCHEMA_VERSION + schemaVersion: Int = NO_SCHEMA_VERSION, + workflowExecutionId: String? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList() + aggregationResultBucket = null, findingIds = emptyList(), relatedDocIds = emptyList(), + workflowExecutionId = workflowExecutionId ) constructor( @@ -75,13 +96,15 @@ data class Alert( errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), schemaVersion: Int = NO_SCHEMA_VERSION, - findingIds: List = emptyList() + findingIds: List = emptyList(), + workflowExecutionId: String? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList() + aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = emptyList(), + workflowExecutionId = workflowExecutionId ) constructor( @@ -95,13 +118,15 @@ data class Alert( actionExecutionResults: List = mutableListOf(), schemaVersion: Int = NO_SCHEMA_VERSION, aggregationResultBucket: AggregationResultBucket, - findingIds: List = emptyList() + findingIds: List = emptyList(), + workflowExecutionId: String? = null ) : this( monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList() + aggregationResultBucket = aggregationResultBucket, findingIds = findingIds, relatedDocIds = emptyList(), + workflowExecutionId = workflowExecutionId ) constructor( @@ -116,13 +141,15 @@ data class Alert( errorMessage: String? = null, errorHistory: List = mutableListOf(), actionExecutionResults: List = mutableListOf(), - schemaVersion: Int = NO_SCHEMA_VERSION + schemaVersion: Int = NO_SCHEMA_VERSION, + workflowExecutionId: String? = null ) : this( id = id, monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version, monitorUser = monitor.user, triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime, lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion, - aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds + aggregationResultBucket = null, findingIds = findingIds, relatedDocIds = relatedDocIds, + workflowExecutionId = workflowExecutionId ) enum class State { @@ -153,7 +180,8 @@ data class Alert( errorHistory = sin.readList(::AlertError), severity = sin.readString(), actionExecutionResults = sin.readList(::ActionExecutionResult), - aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null + aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null, + workflowExecutionId = sin.readOptionalString() ) fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED) @@ -187,6 +215,7 @@ data class Alert( } else { out.writeBoolean(false) } + out.writeOptionalString(workflowExecutionId) } companion object { @@ -211,6 +240,7 @@ data class Alert( const val ALERT_HISTORY_FIELD = "alert_history" const val SEVERITY_FIELD = "severity" const val ACTION_EXECUTION_RESULTS_FIELD = "action_execution_results" + const val WORKFLOW_EXECUTION_ID_FIELD = "workflow_execution_id" const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH const val NO_ID = "" @@ -236,6 +266,7 @@ data class Alert( var lastNotificationTime: Instant? = null var acknowledgedTime: Instant? = null var errorMessage: String? = null + var workflowExecutionId: String? = null val errorHistory: MutableList = mutableListOf() val actionExecutionResults: MutableList = mutableListOf() var aggAlertBucket: AggregationResultBucket? = null @@ -270,6 +301,7 @@ data class Alert( LAST_NOTIFICATION_TIME_FIELD -> lastNotificationTime = xcp.instant() ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant() ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull() + WORKFLOW_EXECUTION_ID_FIELD -> workflowExecutionId = xcp.textOrNull() ALERT_HISTORY_FIELD -> { ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { @@ -305,7 +337,7 @@ data class Alert( lastNotificationTime = lastNotificationTime, acknowledgedTime = acknowledgedTime, errorMessage = errorMessage, errorHistory = errorHistory, severity = severity, actionExecutionResults = actionExecutionResults, aggregationResultBucket = aggAlertBucket, findingIds = findingIds, - relatedDocIds = relatedDocIds + relatedDocIds = relatedDocIds, workflowExecutionId = workflowExecutionId ) } @@ -331,6 +363,7 @@ data class Alert( .field(SCHEMA_VERSION_FIELD, schemaVersion) .field(MONITOR_VERSION_FIELD, monitorVersion) .field(MONITOR_NAME_FIELD, monitorName) + .field(WORKFLOW_EXECUTION_ID_FIELD, workflowExecutionId) if (!secure) { builder.optionalUserField(MONITOR_USER_FIELD, monitorUser) @@ -361,6 +394,7 @@ data class Alert( ALERT_VERSION_FIELD to version, END_TIME_FIELD to endTime?.toEpochMilli(), ERROR_MESSAGE_FIELD to errorMessage, + WORKFLOW_EXECUTION_ID_FIELD to workflowExecutionId, LAST_NOTIFICATION_TIME_FIELD to lastNotificationTime?.toEpochMilli(), SEVERITY_FIELD to severity, START_TIME_FIELD to startTime.toEpochMilli(), diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt new file mode 100644 index 00000000..16c8cb20 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedAlertTrigger.kt @@ -0,0 +1,161 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.UUIDs +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.model.Trigger.Companion.ACTIONS_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.ID_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.NAME_FIELD +import org.opensearch.commons.alerting.model.Trigger.Companion.SEVERITY_FIELD +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.core.ParseField +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.script.Script +import java.io.IOException + +data class ChainedAlertTrigger( + override val id: String = UUIDs.base64UUID(), + override val name: String, + override val severity: String, + override val actions: List, + val condition: Script +) : Trigger { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readString(), // name + sin.readString(), // severity + sin.readList(::Action), // actions + Script(sin) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(CHAINED_ALERT_TRIGGER_FIELD) + .field(ID_FIELD, id) + .field(NAME_FIELD, name) + .field(SEVERITY_FIELD, severity) + .startObject(CONDITION_FIELD) + .field(SCRIPT_FIELD, condition) + .endObject() + .field(ACTIONS_FIELD, actions.toTypedArray()) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return CHAINED_ALERT_TRIGGER_FIELD + } + + /** Returns a representation of the trigger suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf( + ID_FIELD to id, + NAME_FIELD to name, + SEVERITY_FIELD to severity, + ACTIONS_FIELD to actions.map { it.asTemplateArg() } + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + out.writeString(severity) + out.writeCollection(actions) + condition.writeTo(out) + } + + companion object { + const val CHAINED_ALERT_TRIGGER_FIELD = "chained_alert_trigger" + const val CONDITION_FIELD = "condition" + const val SCRIPT_FIELD = "script" + const val QUERY_IDS_FIELD = "query_ids" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + Trigger::class.java, + ParseField(CHAINED_ALERT_TRIGGER_FIELD), + CheckedFunction { parseInner(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parseInner(xcp: XContentParser): ChainedAlertTrigger { + var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified + lateinit var name: String + lateinit var severity: String + lateinit var condition: Script + val queryIds: MutableList = mutableListOf() + val actions: MutableList = mutableListOf() + + if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation) + } + + // If the parser began on START_OBJECT, move to the next token so that the while loop enters on + // the fieldName (or END_OBJECT if it's empty). + if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken() + + while (xcp.currentToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + + xcp.nextToken() + when (fieldName) { + ID_FIELD -> id = xcp.text() + NAME_FIELD -> name = xcp.text() + SEVERITY_FIELD -> severity = xcp.text() + CONDITION_FIELD -> { + xcp.nextToken() + condition = Script.parse(xcp) + require(condition.lang == Script.DEFAULT_SCRIPT_LANG) { + "Invalid script language. Allowed languages are [${Script.DEFAULT_SCRIPT_LANG}]" + } + xcp.nextToken() + } + QUERY_IDS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + queryIds.add(xcp.text()) + } + } + ACTIONS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + actions.add(Action.parse(xcp)) + } + } + } + xcp.nextToken() + } + + return ChainedAlertTrigger( + name = requireNotNull(name) { "Trigger name is null" }, + severity = requireNotNull(severity) { "Trigger severity is null" }, + condition = requireNotNull(condition) { "Trigger condition is null" }, + actions = requireNotNull(actions) { "Trigger actions are null" }, + id = requireNotNull(id) { "Trigger id is null." } + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ChainedAlertTrigger { + return ChainedAlertTrigger(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt index 4b83fed3..995705ca 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Trigger.kt @@ -12,7 +12,8 @@ interface Trigger : BaseModel { enum class Type(val value: String) { DOCUMENT_LEVEL_TRIGGER(DocumentLevelTrigger.DOCUMENT_LEVEL_TRIGGER_FIELD), QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD), - BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD); + BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD), + CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD); override fun toString(): String { return value @@ -52,6 +53,7 @@ interface Trigger : BaseModel { Type.QUERY_LEVEL_TRIGGER -> QueryLevelTrigger(sin) Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin) Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin) + Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin) // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns // enum can be null in Java else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt index fd563cc8..d2f2518d 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt @@ -35,7 +35,8 @@ data class Workflow( val user: User?, val schemaVersion: Int = NO_SCHEMA_VERSION, val inputs: List, - val owner: String? = DEFAULT_OWNER + val owner: String? = DEFAULT_OWNER, + val triggers: List ) : ScheduledJob { override val type = WORKFLOW_TYPE @@ -46,6 +47,11 @@ data class Workflow( require(enabledTime == null) } require(inputs.size <= WORKFLOW_MAX_INPUTS) { "Workflows can only have $WORKFLOW_MAX_INPUTS search input." } + triggers.forEach { trigger -> + run { + require(trigger is ChainedAlertTrigger) { "Incompatible trigger [${trigger.name}] for workflow. " } + } + } } @Throws(IOException::class) @@ -63,7 +69,8 @@ data class Workflow( } else null, schemaVersion = sin.readInt(), inputs = sin.readList((WorkflowInput)::readFrom), - owner = sin.readOptionalString() + owner = sin.readOptionalString(), + triggers = sin.readList((Trigger)::readFrom) ) // This enum classifies different workflows @@ -109,6 +116,7 @@ data class Workflow( .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) .field(SCHEDULE_FIELD, schedule) .field(INPUTS_FIELD, inputs.toTypedArray()) + .field(TRIGGERS_FIELD, triggers.toTypedArray()) .optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) builder.field(OWNER_FIELD, owner) if (params.paramAsBoolean("with_type", false)) builder.endObject() @@ -143,11 +151,17 @@ data class Workflow( } // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it out.writeOptionalString(owner) + out.writeVInt(triggers.size) + triggers.forEach { + when (it) { + is ChainedAlertTrigger -> out.writeEnum(Trigger.Type.CHAINED_ALERT_TRIGGER) + else -> throw IOException("Unsupported trigger type for workflow") + } + it.writeTo(out) + } } companion object { - const val WORKFLOW_DELEGATE_PATH = "workflow.inputs.composite_input.sequence.delegates" - const val WORKFLOW_MONITOR_PATH = "workflow.inputs.composite_input.sequence.delegates.monitor_id" const val WORKFLOW_TYPE = "workflow" const val TYPE_FIELD = "type" const val WORKFLOW_TYPE_FIELD = "workflow_type" @@ -161,6 +175,7 @@ data class Workflow( const val INPUTS_FIELD = "inputs" const val LAST_UPDATE_TIME_FIELD = "last_update_time" const val ENABLED_TIME_FIELD = "enabled_time" + const val TRIGGERS_FIELD = "triggers" const val OWNER_FIELD = "owner" // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all @@ -184,6 +199,7 @@ data class Workflow( var enabled = true var schemaVersion = NO_SCHEMA_VERSION val inputs: MutableList = mutableListOf() + val triggers: MutableList = mutableListOf() var owner = DEFAULT_OWNER XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -206,6 +222,16 @@ data class Workflow( } ENABLED_FIELD -> enabled = xcp.booleanValue() SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) + Monitor.TRIGGERS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + triggers.add(Trigger.parse(xcp)) + } + } INPUTS_FIELD -> { XContentParserUtils.ensureExpectedToken( XContentParser.Token.START_ARRAY, @@ -245,7 +271,8 @@ data class Workflow( user, schemaVersion, inputs.toList(), - owner + owner, + triggers ) } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 781097e4..69eb756c 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -74,7 +74,6 @@ internal class AlertingPluginInterfaceTests { ) val listener: ActionListener = mock(ActionListener::class.java) as ActionListener - val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) Mockito.doAnswer { (it.getArgument(2) as ActionListener) diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 666e5c91..f8e86965 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -21,6 +21,7 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput import org.opensearch.commons.alerting.model.CompositeInput @@ -169,6 +170,7 @@ fun randomWorkflow( enabled: Boolean = Random().nextBoolean(), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomChainedAlertTrigger() }, ): Workflow { val delegates = mutableListOf() if (!monitorIds.isNullOrEmpty()) { @@ -191,6 +193,7 @@ fun randomWorkflow( return Workflow( name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + triggers = triggers ) } @@ -202,10 +205,12 @@ fun randomWorkflowWithDelegates( enabled: Boolean = Random().nextBoolean(), enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), + triggers: List = (1..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomChainedAlertTrigger() }, ): Workflow { return Workflow( name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + triggers = triggers ) } @@ -284,6 +289,25 @@ fun randomDocumentLevelTrigger( ) } +fun randomChainedAlertTrigger( + id: String = UUIDs.base64UUID(), + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + severity: String = "1", + condition: Script = randomScript(), + actions: List = mutableListOf(), + destinationId: String = "" +): ChainedAlertTrigger { + return ChainedAlertTrigger( + id = id, + name = name, + severity = severity, + condition = condition, + actions = if (actions.isEmpty() && destinationId.isNotBlank()) { + (0..RandomNumbers.randomIntBetween(Random(), 0, 10)).map { randomAction(destinationId = destinationId) } + } else actions + ) +} + fun randomBucketSelectorExtAggregationBuilder( name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), bucketsPathsMap: MutableMap = mutableMapOf("avg" to "10"), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt index 1f7e733e..205b9839 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetFindingsResponseTests.kt @@ -10,7 +10,6 @@ import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs import org.opensearch.rest.RestStatus import java.time.Instant -import java.util.List internal class GetFindingsResponseTests { @@ -24,7 +23,7 @@ internal class GetFindingsResponseTests { "monitor_id1", "monitor_name1", "test_index1", - listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", List.of())), + listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", listOf())), Instant.now() ) val findingDocument1 = FindingDocument("test_index1", "doc1", true, "document 1 payload") @@ -42,7 +41,7 @@ internal class GetFindingsResponseTests { "monitor_id2", "monitor_name2", "test_index2", - listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", List.of())), + listOf(DocLevelQuery("1", "myQuery", "fieldA:valABC", listOf())), Instant.now() ) val findingDocument21 = FindingDocument("test_index2", "doc21", true, "document 21 payload") diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt index 523f5650..8565d093 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt @@ -4,8 +4,10 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.Test import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput +import org.opensearch.commons.alerting.model.ChainedAlertTrigger import org.opensearch.commons.alerting.model.CronSchedule import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.randomChainedAlertTrigger import org.opensearch.commons.alerting.randomUser import java.time.Instant import java.time.ZoneId @@ -30,6 +32,7 @@ class IndexWorkflowResponseTests { user = randomUser(), schemaVersion = 0, inputs = mutableListOf(), + triggers = listOf(randomChainedAlertTrigger()) ) val req = IndexWorkflowResponse("1234", 1L, 2L, 0L, workflow) Assertions.assertNotNull(req) @@ -41,5 +44,18 @@ class IndexWorkflowResponseTests { Assertions.assertEquals("1234", newReq.id) Assertions.assertEquals(1L, newReq.version) Assertions.assertNotNull(newReq.workflow) + Assertions.assertEquals(newReq.workflow.triggers.size, 1) + Assertions.assertEquals(newReq.workflow.triggers.get(0).name, req.workflow.triggers.get(0).name) + Assertions.assertEquals(newReq.workflow.triggers.get(0).id, req.workflow.triggers.get(0).id) + Assertions.assertEquals(newReq.workflow.triggers.get(0).severity, req.workflow.triggers.get(0).severity) + Assertions.assertEquals( + (newReq.workflow.triggers.get(0) as ChainedAlertTrigger).condition.idOrCode, + (req.workflow.triggers.get(0) as ChainedAlertTrigger).condition.idOrCode + ) + + Assertions.assertEquals( + (newReq.workflow.triggers.get(0) as ChainedAlertTrigger).condition.lang, + (req.workflow.triggers.get(0) as ChainedAlertTrigger).condition.lang + ) } }