diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 7dd3e6e5..eafe8e69 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -14,12 +14,18 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.notifications.action.BaseResponse @@ -55,6 +61,7 @@ object AlertingPluginInterface { } ) } + fun deleteMonitor( client: NodeClient, request: DeleteMonitorRequest, @@ -73,6 +80,49 @@ object AlertingPluginInterface { ) } + /** + * Index monitor interface. + * @param client Node client for making transport action + * @param request The request object + * @param namedWriteableRegistry Registry for building aggregations + * @param listener The listener for getting response + */ + fun indexWorkflow( + client: NodeClient, + request: IndexWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + IndexWorkflowResponse( + it + ) + } + } + ) + } + + fun deleteWorkflow( + client: NodeClient, + request: DeleteWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + DeleteWorkflowResponse( + it + ) + } + } + ) + } + /** * Get Alerts interface. * @param client Node client for making transport action @@ -97,6 +147,30 @@ object AlertingPluginInterface { ) } + /** + * Get Workflow interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun getWorkflow( + client: NodeClient, + request: GetWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.GET_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + GetWorkflowResponse( + it + ) + } + } + ) + } + /** * Get Findings interface. * @param client Node client for making transport action diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index 1d7a1edd..f0535af7 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -8,8 +8,11 @@ import org.opensearch.action.ActionType object AlertingActions { const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write" + const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write" const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get" + const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get" const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete" + const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete" const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get" const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack" const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe" @@ -18,12 +21,22 @@ object AlertingActions { val INDEX_MONITOR_ACTION_TYPE = ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse) @JvmField + val INDEX_WORKFLOW_ACTION_TYPE = + ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse) + @JvmField val GET_ALERTS_ACTION_TYPE = ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse) @JvmField + val GET_WORKFLOW_ACTION_TYPE = + ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse) + + @JvmField val DELETE_MONITOR_ACTION_TYPE = ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse) @JvmField + val DELETE_WORKFLOW_ACTION_TYPE = + ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse) + @JvmField val GET_FINDINGS_ACTION_TYPE = ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse) @JvmField diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt new file mode 100644 index 00000000..4990f497 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt @@ -0,0 +1,38 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import java.io.IOException + +class DeleteWorkflowRequest : ActionRequest { + + val workflowId: String + /** + * Flag that indicates whether the delegate monitors should be deleted or not. + * If the flag is set to true, Delegate monitors will be deleted only in the case when they are part of the specified workflow and no other. + */ + val deleteDelegateMonitors: Boolean? + + constructor(workflowId: String, deleteDelegateMonitors: Boolean?) : super() { + this.workflowId = workflowId + this.deleteDelegateMonitors = deleteDelegateMonitors + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + deleteDelegateMonitors = sin.readOptionalBoolean() + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeOptionalBoolean(deleteDelegateMonitors) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt new file mode 100644 index 00000000..8da62c5e --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt @@ -0,0 +1,48 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.commons.notifications.action.BaseResponse +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder + +class DeleteWorkflowResponse : BaseResponse { + var id: String + var version: Long + var nonDeletedMonitors: List? = null + + constructor( + id: String, + version: Long, + nonDeletedMonitors: List? = null + ) : super() { + this.id = id + this.version = version + this.nonDeletedMonitors = nonDeletedMonitors + } + + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readOptionalStringList() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeOptionalStringCollection(nonDeletedMonitors) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(IndexUtils._ID, id) + .field(IndexUtils._VERSION, version) + .field(NON_DELETED_MONITORS, nonDeletedMonitors) + .endObject() + } + + companion object { + const val NON_DELETED_MONITORS = "NON_DELETED_MONITORS" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt new file mode 100644 index 00000000..1b7948cd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.rest.RestRequest +import java.io.IOException + +class GetWorkflowRequest : ActionRequest { + val workflowId: String + val method: RestRequest.Method + + constructor( + workflowId: String, + method: RestRequest.Method + ) : super() { + this.workflowId = workflowId + this.method = method + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // workflowId + sin.readEnum(RestRequest.Method::class.java) // method + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeEnum(method) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt new file mode 100644 index 00000000..f18550c5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM +import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.notifications.action.BaseResponse +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.rest.RestStatus +import java.io.IOException + +class GetWorkflowResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + private var status: RestStatus + var workflow: Workflow? + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + status: RestStatus, + workflow: Workflow? + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.status = status + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readLong(), // seqNo + sin.readLong(), // primaryTerm + sin.readEnum(RestStatus::class.java), // RestStatus + if (sin.readBoolean()) { + Workflow.readFrom(sin) // monitor + } else null + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + out.writeEnum(status) + if (workflow != null) { + out.writeBoolean(true) + workflow?.writeTo(out) + } else { + out.writeBoolean(false) + } + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + if (workflow != null) + builder.field("workflow", workflow) + + return builder.endObject() + } + + override fun getStatus(): RestStatus { + return this.status + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt new file mode 100644 index 00000000..15a895ca --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -0,0 +1,137 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.rest.RestRequest +import java.io.IOException +import java.util.stream.Collectors + +class IndexWorkflowRequest : ActionRequest { + val workflowId: String + val seqNo: Long + val primaryTerm: Long + val refreshPolicy: WriteRequest.RefreshPolicy + val method: RestRequest.Method + var workflow: Workflow + val rbacRoles: List? + + private val MAX_DELEGATE_SIZE = 25 + + constructor( + workflowId: String, + seqNo: Long, + primaryTerm: Long, + refreshPolicy: WriteRequest.RefreshPolicy, + method: RestRequest.Method, + workflow: Workflow, + rbacRoles: List? = null + ) : super() { + this.workflowId = workflowId + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.refreshPolicy = refreshPolicy + this.method = method + this.workflow = workflow + this.rbacRoles = rbacRoles + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin), + method = sin.readEnum(RestRequest.Method::class.java), + workflow = Workflow.readFrom(sin) as Workflow, + rbacRoles = sin.readOptionalStringList() + ) + + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + + if (workflow.inputs.isEmpty()) { + validationException = ValidateActions.addValidationError( + "Input list can not be empty.", validationException + ) + return validationException + } + if (workflow.inputs.size > 1) { + validationException = ValidateActions.addValidationError( + "Input list can contain only one element.", validationException + ) + return validationException + } + if (workflow.inputs[0] !is CompositeInput) { + validationException = ValidateActions.addValidationError( + "When creating a workflow input must be CompositeInput", validationException + ) + } + val compositeInput = workflow.inputs[0] as CompositeInput + val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) + + if (monitorIds.isNullOrEmpty()) { + validationException = ValidateActions.addValidationError( + "Delegates list can not be empty.", validationException + ) + // Break the flow because next checks are dependant on non-null monitorIds + return validationException + } + + if (monitorIds.size > MAX_DELEGATE_SIZE) { + validationException = ValidateActions.addValidationError( + "Delegates list can not be larger then $MAX_DELEGATE_SIZE.", validationException + ) + } + + if (monitorIds.toSet().size != monitorIds.size) { + validationException = ValidateActions.addValidationError( + "Duplicate delegates not allowed", validationException + ) + } + val delegates = compositeInput.sequence.delegates + val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet()) + if (orderSet.size != delegates.size) { + validationException = ValidateActions.addValidationError( + "Sequence ordering of delegate monitor shouldn't contain duplicate order values", validationException + ) + } + + val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } + delegates.forEach { + if (it.chainedMonitorFindings != null) { + if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence", + validationException + ) + // Break the flow because next check will generate the NPE + return validationException + } + if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) { + validationException = ValidateActions.addValidationError( + "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}", + validationException + ) + } + } + } + return validationException + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + refreshPolicy.writeTo(out) + out.writeEnum(method) + workflow.writeTo(out) + out.writeOptionalStringCollection(rbacRoles) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt new file mode 100644 index 00000000..89863ba5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt @@ -0,0 +1,61 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.commons.notifications.action.BaseResponse +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class IndexWorkflowResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + var workflow: Workflow + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + workflow: Workflow + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readLong(), // seqNo + sin.readLong(), // primaryTerm + Workflow.readFrom(sin) as Workflow // workflow + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + workflow.writeTo(out) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(IndexUtils._ID, id) + .field(IndexUtils._VERSION, version) + .field(IndexUtils._SEQ_NO, seqNo) + .field(IndexUtils._PRIMARY_TERM, primaryTerm) + .field("workflow", workflow) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt new file mode 100644 index 00000000..cf2bafd6 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -0,0 +1,77 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.commons.utils.validateId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException + +/** + * Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id. + */ +// TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties +data class ChainedMonitorFindings( + val monitorId: String +) : BaseModel { + + init { + validateId(monitorId) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // monitorId + ) + + fun asTemplateArg(): Map { + return mapOf( + MONITOR_ID_FIELD to monitorId, + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(MONITOR_ID_FIELD, monitorId) + .endObject() + return builder + } + + companion object { + const val MONITOR_ID_FIELD = "monitor_id" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ChainedMonitorFindings { + lateinit var monitorId: String + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MONITOR_ID_FIELD -> { + monitorId = xcp.text() + validateId(monitorId) + } + } + } + return ChainedMonitorFindings(monitorId) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ChainedMonitorFindings { + return ChainedMonitorFindings(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt new file mode 100644 index 00000000..229f20e2 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt @@ -0,0 +1,84 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.core.ParseField +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException + +data class CompositeInput( + val sequence: Sequence, +) : WorkflowInput { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + Sequence(sin) + ) + + fun asTemplateArg(): Map { + return mapOf( + SEQUENCE_FIELD to sequence + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + sequence.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(COMPOSITE_INPUT_FIELD) + .field(SEQUENCE_FIELD, sequence) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return COMPOSITE_INPUT_FIELD + } + + fun getMonitorIds(): List { + return sequence.delegates.map { delegate -> delegate.monitorId } + } + + companion object { + const val COMPOSITE_INPUT_FIELD = "composite_input" + const val SEQUENCE_FIELD = "sequence" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + WorkflowInput::class.java, + ParseField(COMPOSITE_INPUT_FIELD), CheckedFunction { CompositeInput.parse(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): CompositeInput { + var sequence = Sequence(emptyList()) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SEQUENCE_FIELD -> { + sequence = Sequence.parse(xcp) + } + } + } + + return CompositeInput(sequence) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): CompositeInput { + return CompositeInput(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt new file mode 100644 index 00000000..65158a68 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt @@ -0,0 +1,118 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.commons.utils.validateId +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException + +/** + * Each underlying monitors defined in the composite monitor sequence input. + * They are executed sequentially in the order mentioned. + * Optionally accepts chained findings context. + * */ +data class Delegate( + /** + * Defines the order of the monitor in delegate list + */ + val order: Int, + /** + * Id of the monitor + */ + val monitorId: String, + /** + * Keeps the track of the previously executed monitor in a chain list. + * Used for pre-filtering by getting the findings doc ids for the given monitor + */ + val chainedMonitorFindings: ChainedMonitorFindings? = null +) : BaseModel { + + init { + validateId(monitorId) + validateOrder(order) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + order = sin.readInt(), + monitorId = sin.readString(), + chainedMonitorFindings = if (sin.readBoolean()) { + ChainedMonitorFindings(sin) + } else null, + ) + + fun asTemplateArg(): Map { + return mapOf( + ORDER_FIELD to order, + MONITOR_ID_FIELD to monitorId, + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeInt(order) + out.writeString(monitorId) + out.writeBoolean(chainedMonitorFindings != null) + chainedMonitorFindings?.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(ORDER_FIELD, order) + .field(MONITOR_ID_FIELD, monitorId) + if (chainedMonitorFindings != null) { + builder.field(CHAINED_FINDINGS_FIELD, chainedMonitorFindings) + } + builder.endObject() + return builder + } + + companion object { + const val ORDER_FIELD = "order" + const val MONITOR_ID_FIELD = "monitor_id" + const val CHAINED_FINDINGS_FIELD = "chained_monitor_findings" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Delegate { + lateinit var monitorId: String + var order = 0 + var chainedMonitorFindings: ChainedMonitorFindings? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ORDER_FIELD -> { + order = xcp.intValue() + validateOrder(order) + } + MONITOR_ID_FIELD -> { + monitorId = xcp.text() + validateId(monitorId) + } + CHAINED_FINDINGS_FIELD -> { + chainedMonitorFindings = ChainedMonitorFindings.parse(xcp) + } + } + } + return Delegate(order, monitorId, chainedMonitorFindings) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Delegate { + return Delegate(sin) + } + + fun validateOrder(order: Int) { + require(order > 0) { "Invalid delgate order" } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt index 3b3b3976..d4e69498 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt @@ -22,9 +22,33 @@ class Finding( val monitorName: String, val index: String, val docLevelQueries: List, - val timestamp: Instant + val timestamp: Instant, + /** + * Keeps the track of the workflow-monitor exact execution. + * Used for filtering the data when chaining monitors in a workflow. + */ + val executionId: String? = null, ) : Writeable, ToXContent { + constructor( + id: String = NO_ID, + relatedDocIds: List, + monitorId: String, + monitorName: String, + index: String, + docLevelQueries: List, + timestamp: Instant + ) : this ( + id = id, + relatedDocIds = relatedDocIds, + monitorId = monitorId, + monitorName = monitorName, + index = index, + docLevelQueries = docLevelQueries, + timestamp = timestamp, + executionId = null + ) + @Throws(IOException::class) constructor(sin: StreamInput) : this( id = sin.readString(), @@ -34,7 +58,8 @@ class Finding( monitorName = sin.readString(), index = sin.readString(), docLevelQueries = sin.readList((DocLevelQuery)::readFrom), - timestamp = sin.readInstant() + timestamp = sin.readInstant(), + executionId = sin.readOptionalString() ) fun asTemplateArg(): Map { @@ -46,7 +71,8 @@ class Finding( MONITOR_NAME_FIELD to monitorName, INDEX_FIELD to index, QUERIES_FIELD to docLevelQueries, - TIMESTAMP_FIELD to timestamp.toEpochMilli() + TIMESTAMP_FIELD to timestamp.toEpochMilli(), + EXECUTION_ID_FIELD to executionId ) } @@ -60,6 +86,7 @@ class Finding( .field(INDEX_FIELD, index) .field(QUERIES_FIELD, docLevelQueries.toTypedArray()) .field(TIMESTAMP_FIELD, timestamp.toEpochMilli()) + .field(EXECUTION_ID_FIELD, executionId) builder.endObject() return builder } @@ -74,6 +101,7 @@ class Finding( out.writeString(index) out.writeCollection(docLevelQueries) out.writeInstant(timestamp) + out.writeOptionalString(executionId) } companion object { @@ -85,6 +113,7 @@ class Finding( const val INDEX_FIELD = "index" const val QUERIES_FIELD = "queries" const val TIMESTAMP_FIELD = "timestamp" + const val EXECUTION_ID_FIELD = "execution_id" const val NO_ID = "" @JvmStatic @JvmOverloads @@ -98,6 +127,7 @@ class Finding( lateinit var index: String val queries: MutableList = mutableListOf() lateinit var timestamp: Instant + var executionId: String? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -130,6 +160,7 @@ class Finding( TIMESTAMP_FIELD -> { timestamp = requireNotNull(xcp.instant()) } + EXECUTION_ID_FIELD -> executionId = xcp.textOrNull() } } @@ -141,7 +172,8 @@ class Finding( monitorName = monitorName, index = index, docLevelQueries = queries, - timestamp = timestamp + timestamp = timestamp, + executionId = executionId ) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt new file mode 100644 index 00000000..108f4004 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt @@ -0,0 +1,75 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException + +/** Delegate monitors passed as input for composite monitors. */ +data class Sequence( + val delegates: List +) : BaseModel { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readList(::Delegate) + ) + + fun asTemplateArg(): Map { + return mapOf( + DELEGATES_FIELD to delegates, + ) + } + + companion object { + const val SEQUENCE_FIELD = "sequence" + const val DELEGATES_FIELD = "delegates" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Sequence { + val delegates: MutableList = mutableListOf() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + DELEGATES_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + delegates.add(Delegate.parse(xcp)) + } + } + } + } + return Sequence(delegates) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): DocLevelMonitorInput { + return DocLevelMonitorInput(sin) + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeCollection(delegates) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(DELEGATES_FIELD, delegates.toTypedArray()) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt new file mode 100644 index 00000000..fd563cc8 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt @@ -0,0 +1,265 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION +import org.opensearch.commons.alerting.util.IndexUtils.Companion.WORKFLOW_MAX_INPUTS +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.commons.alerting.util.optionalUserField +import org.opensearch.commons.authuser.User +import org.opensearch.core.ParseField +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException +import java.time.Instant +import java.util.Locale + +data class Workflow( + override val id: String = NO_ID, + override val version: Long = NO_VERSION, + override val name: String, + override val enabled: Boolean, + override val schedule: Schedule, + override val lastUpdateTime: Instant, + override val enabledTime: Instant?, + // TODO: Check how this behaves during rolling upgrade/multi-version cluster + // Can read/write and parsing break if it's done from an old -> new version of the plugin? + val workflowType: WorkflowType, + val user: User?, + val schemaVersion: Int = NO_SCHEMA_VERSION, + val inputs: List, + val owner: String? = DEFAULT_OWNER +) : ScheduledJob { + override val type = WORKFLOW_TYPE + + init { + if (enabled) { + requireNotNull(enabledTime) + } else { + require(enabledTime == null) + } + require(inputs.size <= WORKFLOW_MAX_INPUTS) { "Workflows can only have $WORKFLOW_MAX_INPUTS search input." } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + version = sin.readLong(), + name = sin.readString(), + enabled = sin.readBoolean(), + schedule = Schedule.readFrom(sin), + lastUpdateTime = sin.readInstant(), + enabledTime = sin.readOptionalInstant(), + workflowType = sin.readEnum(WorkflowType::class.java), + user = if (sin.readBoolean()) { + User(sin) + } else null, + schemaVersion = sin.readInt(), + inputs = sin.readList((WorkflowInput)::readFrom), + owner = sin.readOptionalString() + ) + + // This enum classifies different workflows + // This is different from 'type' which denotes the Scheduled Job type + enum class WorkflowType(val value: String) { + COMPOSITE("composite"); + + override fun toString(): String { + return value + } + } + + /** Returns a representation of the workflow suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf(_ID to id, _VERSION to version, NAME_FIELD to name, ENABLED_FIELD to enabled) + } + + fun toXContentWithUser(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return createXContentBuilder(builder, params, false) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return createXContentBuilder(builder, params, true) + } + + private fun createXContentBuilder( + builder: XContentBuilder, + params: ToXContent.Params, + secure: Boolean + ): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(type) + builder.field(TYPE_FIELD, type) + .field(SCHEMA_VERSION_FIELD, schemaVersion) + .field(NAME_FIELD, name) + .field(WORKFLOW_TYPE_FIELD, workflowType) + + if (!secure) { + builder.optionalUserField(USER_FIELD, user) + } + + builder.field(ENABLED_FIELD, enabled) + .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) + .field(SCHEDULE_FIELD, schedule) + .field(INPUTS_FIELD, inputs.toTypedArray()) + .optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) + builder.field(OWNER_FIELD, owner) + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + override fun fromDocument(id: String, version: Long): Workflow = copy(id = id, version = version) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeString(name) + out.writeBoolean(enabled) + if (schedule is CronSchedule) { + out.writeEnum(Schedule.TYPE.CRON) + } else { + out.writeEnum(Schedule.TYPE.INTERVAL) + } + schedule.writeTo(out) + out.writeInstant(lastUpdateTime) + out.writeOptionalInstant(enabledTime) + out.writeEnum(workflowType) + out.writeBoolean(user != null) + user?.writeTo(out) + out.writeInt(schemaVersion) + // Outputting type with each Input so that the generic Input.readFrom() can read it + out.writeVInt(inputs.size) + inputs.forEach { + if (it is CompositeInput) out.writeEnum(WorkflowInput.Type.COMPOSITE_INPUT) + it.writeTo(out) + } + // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it + out.writeOptionalString(owner) + } + + companion object { + const val WORKFLOW_DELEGATE_PATH = "workflow.inputs.composite_input.sequence.delegates" + const val WORKFLOW_MONITOR_PATH = "workflow.inputs.composite_input.sequence.delegates.monitor_id" + const val WORKFLOW_TYPE = "workflow" + const val TYPE_FIELD = "type" + const val WORKFLOW_TYPE_FIELD = "workflow_type" + const val SCHEMA_VERSION_FIELD = "schema_version" + const val NAME_FIELD = "name" + const val USER_FIELD = "user" + const val ENABLED_FIELD = "enabled" + const val SCHEDULE_FIELD = "schedule" + const val NO_ID = "" + const val NO_VERSION = 1L + const val INPUTS_FIELD = "inputs" + const val LAST_UPDATE_TIME_FIELD = "last_update_time" + const val ENABLED_TIME_FIELD = "enabled_time" + const val OWNER_FIELD = "owner" + + // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all + // the different subclasses and creating circular dependencies + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + ScheduledJob::class.java, + ParseField(WORKFLOW_TYPE), + CheckedFunction { parse(it) } + ) + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Workflow { + var name: String? = null + var workflowType: String = WorkflowType.COMPOSITE.toString() + var user: User? = null + var schedule: Schedule? = null + var lastUpdateTime: Instant? = null + var enabledTime: Instant? = null + var enabled = true + var schemaVersion = NO_SCHEMA_VERSION + val inputs: MutableList = mutableListOf() + var owner = DEFAULT_OWNER + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() + NAME_FIELD -> name = xcp.text() + WORKFLOW_TYPE_FIELD -> { + workflowType = xcp.text() + val allowedTypes = WorkflowType.values().map { it.value } + if (!allowedTypes.contains(workflowType)) { + throw IllegalStateException("Workflow type should be one of $allowedTypes") + } + } + USER_FIELD -> { + user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) + } + ENABLED_FIELD -> enabled = xcp.booleanValue() + SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) + INPUTS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + val input = WorkflowInput.parse(xcp) + inputs.add(input) + } + } + ENABLED_TIME_FIELD -> enabledTime = xcp.instant() + LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant() + OWNER_FIELD -> { + owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() + } + else -> { + xcp.skipChildren() + } + } + } + + if (enabled && enabledTime == null) { + enabledTime = Instant.now() + } else if (!enabled) { + enabledTime = null + } + return Workflow( + id, + version, + requireNotNull(name) { "Workflow name is null" }, + enabled, + requireNotNull(schedule) { "Workflow schedule is null" }, + lastUpdateTime ?: Instant.now(), + enabledTime, + WorkflowType.valueOf(workflowType.uppercase(Locale.ROOT)), + user, + schemaVersion, + inputs.toList(), + owner + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Workflow? { + return Workflow(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + + private const val DEFAULT_OWNER = "alerting" + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt new file mode 100644 index 00000000..bdead75a --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt @@ -0,0 +1,48 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException + +interface WorkflowInput : BaseModel { + + enum class Type(val value: String) { + COMPOSITE_INPUT(CompositeInput.COMPOSITE_INPUT_FIELD); + + override fun toString(): String { + return value + } + } + + companion object { + + @Throws(IOException::class) + fun parse(xcp: XContentParser): WorkflowInput { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val input = if (xcp.currentName() == Type.COMPOSITE_INPUT.value) { + CompositeInput.parse(xcp) + } else { + throw IllegalStateException("Unexpected input type when reading Input") + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + return input + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowInput { + return when (val type = sin.readEnum(Type::class.java)) { + Type.COMPOSITE_INPUT -> CompositeInput(sin) + // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns + // enum can be null in Java + else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") + } + } + } + + fun name(): String +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index 803ab675..51f9be52 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -14,6 +14,7 @@ class IndexUtils { const val NO_SCHEMA_VERSION = 0 const val MONITOR_MAX_INPUTS = 1 + const val WORKFLOW_MAX_INPUTS = 1 const val MONITOR_MAX_TRIGGERS = 10 diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 62e425d9..287bd990 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -14,17 +14,28 @@ import org.opensearch.action.ActionType import org.opensearch.client.node.NodeClient import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest +import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.RestStatus import org.opensearch.search.SearchModule @@ -55,6 +66,31 @@ internal class AlertingPluginInterfaceTests { Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + @Test + fun indexWorkflow() { + val workflow = randomWorkflow() + + val request = mock(IndexWorkflowRequest::class.java) + val response = IndexWorkflowResponse( + Workflow.NO_ID, + Workflow.NO_VERSION, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + workflow + ) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.indexWorkflow(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + @Test fun indexBucketMonitor() { val monitor = randomBucketLevelMonitor() @@ -89,6 +125,38 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.deleteMonitor(client, request, listener) } + @Test + fun deleteWorkflow() { + val request = mock(DeleteWorkflowRequest::class.java) + val response = DeleteWorkflowResponse(Workflow.NO_ID, Workflow.NO_VERSION) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.deleteWorkflow(client, request, listener) + } + + @Test + fun getWorkflow() { + val request = mock(GetWorkflowRequest::class.java) + val response = GetWorkflowResponse( + id = "id", version = 1, seqNo = 1, primaryTerm = 1, status = RestStatus.OK, workflow = randomWorkflow() + ) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.getWorkflow(client, request, listener) + } + @Test fun getAlerts() { val monitor = randomQueryLevelMonitor() @@ -129,4 +197,33 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.getFindings(client, request, listener) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + + @Test + fun publishFindings() { + val request = mock(PublishFindingsRequest::class.java) + val response = SubscribeFindingsResponse(status = RestStatus.OK) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.publishFinding(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + + @Test + fun acknowledgeAlerts() { + val request = mock(AcknowledgeAlertRequest::class.java) + val response = AcknowledgeAlertResponse(acknowledged = listOf(), failed = listOf(), missing = listOf()) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.acknowledgeAlerts(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 685898ec..5979f858 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -21,7 +21,10 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -33,7 +36,10 @@ import org.opensearch.commons.alerting.model.NoOpTrigger import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Sequence import org.opensearch.commons.alerting.model.Trigger +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.WorkflowInput import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope @@ -156,6 +162,73 @@ fun randomDocumentLevelMonitor( ) } +fun randomWorkflow( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User? = randomUser(), + monitorIds: List? = null, + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + val delegates = mutableListOf() + if (!monitorIds.isNullOrEmpty()) { + delegates.add(Delegate(1, monitorIds[0])) + for (i in 1 until monitorIds.size) { + // Order of monitors in workflow will be the same like forwarded meaning that the first monitorId will be used as second monitor chained finding + delegates.add(Delegate(i + 1, monitorIds [i], ChainedMonitorFindings(monitorIds[i - 1]))) + } + } + var input = listOf(CompositeInput(Sequence(delegates))) + if (input == null) { + input = listOf( + CompositeInput( + Sequence( + listOf(Delegate(1, "delegate1")) + ) + ) + ) + } + return Workflow( + name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, + schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + ) +} + +fun randomWorkflowWithDelegates( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User? = randomUser(), + input: List, + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + return Workflow( + name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, + schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + ) +} + +fun Workflow.toJsonStringWithUser(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() +} + +fun randomSequence( + delegates: List = listOf(randomDelegate()) +): Sequence { + return Sequence(delegates) +} + +fun randomDelegate( + order: Int = 1, + monitorId: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + chainedMonitorFindings: ChainedMonitorFindings? = null +): Delegate { + return Delegate(order, monitorId, chainedMonitorFindings) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), @@ -303,6 +376,11 @@ fun randomClusterMetricsInput( return ClusterMetricsInput(path, pathParams, url) } +fun Workflow.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() +} + fun Monitor.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt new file mode 100644 index 00000000..8ebe91f3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt @@ -0,0 +1,23 @@ +package org.opensearch.commons.alerting.action + +import org.junit.Assert +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput + +class DeleteWorkflowRequestTests { + + @Test + fun `test delete workflow request`() { + + val req = DeleteWorkflowRequest("1234", true) + Assert.assertNotNull(req) + Assert.assertEquals("1234", req.workflowId) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = DeleteWorkflowRequest(sin) + Assert.assertEquals("1234", newReq.workflowId) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponseTests.kt new file mode 100644 index 00000000..299925f4 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponseTests.kt @@ -0,0 +1,25 @@ +package org.opensearch.commons.alerting.action + +import org.junit.Assert +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput + +class DeleteWorkflowResponseTests { + + @Test + fun `test delete workflow response`() { + + val res = DeleteWorkflowResponse(id = "w1", version = 1, nonDeletedMonitors = listOf("m1")) + Assert.assertNotNull(res) + Assert.assertEquals("w1", res.id) + + val out = BytesStreamOutput() + res.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRes = DeleteWorkflowResponse(sin) + Assert.assertEquals("w1", newRes.id) + Assert.assertEquals("m1", newRes.nonDeletedMonitors!!.get(0)) + Assert.assertEquals(1, newRes.version) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequestTests.kt new file mode 100644 index 00000000..c9e8ebca --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequestTests.kt @@ -0,0 +1,23 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.rest.RestRequest + +class GetWorkflowRequestTests { + + @Test + fun testGetWorkflowRequest() { + val request = GetWorkflowRequest("w1", RestRequest.Method.GET) + Assertions.assertNull(request.validate()) + + val out = BytesStreamOutput() + request.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetWorkflowRequest(sin) + Assertions.assertEquals("w1", newReq.workflowId) + Assertions.assertEquals(RestRequest.Method.GET, newReq.method) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponseTests.kt new file mode 100644 index 00000000..91a22fc4 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponseTests.kt @@ -0,0 +1,26 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.commons.alerting.randomWorkflow +import org.opensearch.rest.RestStatus + +class GetWorkflowResponseTests { + + @Test + fun testGetWorkflowRequest() { + val workflow = randomWorkflow() + val response = GetWorkflowResponse( + id = "id", version = 1, seqNo = 1, primaryTerm = 1, status = RestStatus.OK, workflow = workflow + ) + val out = BytesStreamOutput() + response.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newRes = GetWorkflowResponse(sin) + Assertions.assertEquals("id", newRes.id) + Assertions.assertEquals(workflow.name, newRes.workflow!!.name) + Assertions.assertEquals(workflow.owner, newRes.workflow!!.owner) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt new file mode 100644 index 00000000..d25cd5b2 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt @@ -0,0 +1,210 @@ +package org.opensearch.commons.alerting.action + +import org.junit.Assert +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput +import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.model.ChainedMonitorFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.Sequence +import org.opensearch.commons.alerting.randomWorkflow +import org.opensearch.commons.alerting.randomWorkflowWithDelegates +import org.opensearch.commons.utils.recreateObject +import org.opensearch.rest.RestRequest +import org.opensearch.search.SearchModule +import java.lang.Exception +import java.lang.IllegalArgumentException +import java.util.UUID + +class IndexWorkflowRequestTests { + + @Test + fun `test index workflow post request`() { + + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowRequest(sin) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `test index composite workflow post request`() { + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + val newReq = IndexWorkflowRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry)) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `Index composite workflow serialize and deserialize transport object should be equal`() { + val compositeWorkflowRequest = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomWorkflow() + ) + + val recreatedObject = recreateObject( + compositeWorkflowRequest, + NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + ) { IndexWorkflowRequest(it) } + Assertions.assertEquals(compositeWorkflowRequest.workflowId, recreatedObject.workflowId) + Assertions.assertEquals(compositeWorkflowRequest.seqNo, recreatedObject.seqNo) + Assertions.assertEquals(compositeWorkflowRequest.primaryTerm, recreatedObject.primaryTerm) + Assertions.assertEquals(compositeWorkflowRequest.method, recreatedObject.method) + Assertions.assertNotNull(recreatedObject.workflow) + Assertions.assertEquals(compositeWorkflowRequest.workflow, recreatedObject.workflow) + } + + @Test + fun `test index workflow put request`() { + + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowRequest(sin) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.PUT, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `test validate`() { + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow(monitorIds = emptyList()) + ) + Assertions.assertNotNull(req) + // Empty input list + var validate = req.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be empty.;")) + // Duplicate delegate + val req1 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow(monitorIds = listOf("1L", "1L", "2L")) + ) + validate = req1.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Duplicate delegates not allowed")) + // Sequence not correct + var delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val req2 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + validate = req2.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) + // Chained finding sequence not correct + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x")) + ) + val req3 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + validate = req3.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-x doesn't exist in sequence")) + // Order not correct + delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2")) + ) + val req4 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates))) + ) + ) + validate = req4.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3")) + // Max monitor size + val monitorsIds = mutableListOf() + for (i in 0..25) { + monitorsIds.add(UUID.randomUUID().toString()) + } + val req5 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflow( + monitorIds = monitorsIds + ) + ) + validate = req5.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Delegates list can not be larger then 25.")) + // Input list empty + val req6 = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = emptyList() + ) + ) + validate = req6.validate() + Assert.assertTrue(validate != null) + Assert.assertTrue(validate!!.message!!.contains("Input list can not be empty.")) + // Input list multiple elements + delegates = listOf( + Delegate(1, "monitor-1") + ) + try { + IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomWorkflowWithDelegates( + input = listOf(CompositeInput(Sequence(delegates = delegates)), CompositeInput(Sequence(delegates = delegates))) + ) + ) + } catch (ex: Exception) { + Assert.assertTrue(ex is IllegalArgumentException) + Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input.")) + } + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt new file mode 100644 index 00000000..523f5650 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt @@ -0,0 +1,45 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.commons.alerting.model.CronSchedule +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.randomUser +import java.time.Instant +import java.time.ZoneId + +class IndexWorkflowResponseTests { + + @Test + fun `test index workflow response with workflow`() { + val cronExpression = "31 * * * *" // Run at minute 31. + val testInstance = Instant.ofEpochSecond(1538164858L) + + val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) + val workflow = Workflow( + id = "123", + version = 0L, + name = "test-workflow", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + workflowType = Workflow.WorkflowType.COMPOSITE, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + ) + val req = IndexWorkflowResponse("1234", 1L, 2L, 0L, workflow) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowResponse(sin) + Assertions.assertEquals("1234", newReq.id) + Assertions.assertEquals(1L, newReq.version) + Assertions.assertNotNull(newReq.workflow) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt new file mode 100644 index 00000000..9680bdbe --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt @@ -0,0 +1,86 @@ +package org.opensearch.commons.alerting.model + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.commons.alerting.randomDelegate +import org.opensearch.commons.alerting.randomSequence + +class CompositeInputTests { + @Test + fun `test sequence asTemplateArgs`() { + val sequence = randomSequence() + // WHEN + val templateArgs = sequence.asTemplateArg() + + // THEN + val templateDelegates = templateArgs + Assertions.assertEquals( + templateDelegates[Sequence.DELEGATES_FIELD], + sequence.delegates, + "Template args 'id' field does not match:" + ) + } + + @Test + fun `test delegate asTemplateArgs`() { + val delegate = randomDelegate() + // WHEN + val templateArgs = delegate.asTemplateArg() + + // THEN + val templateDelegates = templateArgs + Assertions.assertEquals( + templateDelegates[Delegate.ORDER_FIELD], + delegate.order, + "Template args 'id' field does not match:" + ) + Assertions.assertEquals( + templateDelegates[Delegate.MONITOR_ID_FIELD], + delegate.monitorId, + "Template args 'id' field does not match:" + ) + } + + @Test + fun `test create Delegate with illegal order value`() { + try { + randomDelegate(-1) + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + Assertions.assertEquals( + "Invalid delgate order", + e.message + ) + } + } + + @Test + fun `test create Delegate with illegal monitorId value`() { + try { + randomDelegate(1, "") + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("Invalid characters in id") + + ) + } + } + } + + @Test + fun `test create Chained Findings with illegal monitorId value`() { + try { + ChainedMonitorFindings("") + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("Invalid characters in id") + + ) + } + } + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt index 9f5e26b9..83ff4014 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/WriteableTests.kt @@ -16,6 +16,7 @@ import org.opensearch.commons.alerting.randomQueryLevelTrigger import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty +import org.opensearch.commons.alerting.randomWorkflow import org.opensearch.commons.authuser.User import org.opensearch.search.builder.SearchSourceBuilder @@ -81,6 +82,16 @@ class WriteableTests { Assertions.assertEquals(monitor, newMonitor, "Round tripping QueryLevelMonitor doesn't work") } + @Test + fun `test workflow as stream`() { + val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3", "4")) + val out = BytesStreamOutput() + workflow.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newWorkflow = Workflow(sin) + Assertions.assertEquals(newWorkflow, workflow, "Round tripping Workflow failed") + } + @Test fun `test query-level trigger as stream`() { val trigger = randomQueryLevelTrigger() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index f8c98842..a96f967c 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -22,6 +22,7 @@ import org.opensearch.commons.alerting.randomQueryLevelTrigger import org.opensearch.commons.alerting.randomThrottle import org.opensearch.commons.alerting.randomUser import org.opensearch.commons.alerting.randomUserEmpty +import org.opensearch.commons.alerting.randomWorkflow import org.opensearch.commons.alerting.toJsonString import org.opensearch.commons.alerting.toJsonStringWithUser import org.opensearch.commons.alerting.util.string @@ -160,6 +161,14 @@ class XContentTests { Assertions.assertEquals(monitor, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") } + @Test + fun `test composite workflow parsing`() { + val workflow = randomWorkflow() + val monitorString = workflow.toJsonStringWithUser() + val parsedMonitor = Workflow.parse(parser(monitorString)) + Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") + } + @Test fun `test query-level trigger parsing`() { val trigger = randomQueryLevelTrigger() @@ -229,6 +238,15 @@ class XContentTests { Assertions.assertNull(parsedMonitor.user) } + @Test + fun `test workflow parsing`() { + val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3")) + + val monitorString = workflow.toJsonString() + val parsedWorkflow = Workflow.parse(parser(monitorString)) + Assertions.assertEquals(workflow, parsedWorkflow, "Round tripping workflow failed") + } + @Test fun `test old monitor format parsing`() { val monitorString = """