From 8233e7f5ded52dd6867e33558fd41d5907904ad7 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Mon, 13 Mar 2023 23:09:54 +0100 Subject: [PATCH 1/4] Added workflow as a composite monitor Signed-off-by: Stevan Buzejic --- .../alerting/AlertingPluginInterface.kt | 74 +++++ .../alerting/action/AlertingActions.kt | 13 + .../alerting/action/DeleteWorkflowRequest.kt | 39 +++ .../alerting/action/DeleteWorkflowResponse.kt | 38 +++ .../alerting/action/GetWorkflowRequest.kt | 56 ++++ .../alerting/action/GetWorkflowResponse.kt | 88 ++++++ .../alerting/action/IndexWorkflowRequest.kt | 64 +++++ .../alerting/action/IndexWorkflowResponse.kt | 61 ++++ .../commons/alerting/model/ChainedFindings.kt | 76 +++++ .../commons/alerting/model/CompositeInput.kt | 84 ++++++ .../commons/alerting/model/Delegate.kt | 108 +++++++ .../commons/alerting/model/Finding.kt | 36 ++- .../commons/alerting/model/Sequence.kt | 75 +++++ .../commons/alerting/model/Workflow.kt | 264 ++++++++++++++++++ .../commons/alerting/model/WorkflowInput.kt | 48 ++++ .../commons/alerting/util/IndexUtils.kt | 1 + .../alerting/AlertingPluginInterfaceTests.kt | 45 +++ .../commons/alerting/TestHelpers.kt | 50 ++++ .../action/DeleteWorkflowRequestTests.kt | 26 ++ .../action/IndexWorkflowRequestTests.kt | 96 +++++++ .../action/IndexWorkflowResponseTests.kt | 45 +++ .../alerting/model/CompositeInputTests.kt | 86 ++++++ .../commons/alerting/model/XContentTests.kt | 9 + 23 files changed, 1478 insertions(+), 4 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt create mode 100644 src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt create mode 100644 src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 05c60be8..18a4ba76 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -14,12 +14,18 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.notifications.action.BaseResponse import org.opensearch.commons.utils.recreateObject @@ -53,6 +59,7 @@ object AlertingPluginInterface { } ) } + fun deleteMonitor( client: NodeClient, request: DeleteMonitorRequest, @@ -71,6 +78,49 @@ object AlertingPluginInterface { ) } + /** + * Index monitor interface. + * @param client Node client for making transport action + * @param request The request object + * @param namedWriteableRegistry Registry for building aggregations + * @param listener The listener for getting response + */ + fun indexWorkflow( + client: NodeClient, + request: IndexWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + IndexWorkflowResponse( + it + ) + } + } + ) + } + + fun deleteWorkflow( + client: NodeClient, + request: DeleteWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + DeleteWorkflowResponse( + it + ) + } + } + ) + } + /** * Get Alerts interface. * @param client Node client for making transport action @@ -95,6 +145,30 @@ object AlertingPluginInterface { ) } + /** + * Get Workflow interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun getWorkflow( + client: NodeClient, + request: GetWorkflowRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.GET_WORKFLOW_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + GetWorkflowResponse( + it + ) + } + } + ) + } + /** * Get Findings interface. * @param client Node client for making transport action diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index 23a5ce77..e834199e 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -8,8 +8,11 @@ import org.opensearch.action.ActionType object AlertingActions { const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write" + const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write" const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get" + const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get" const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete" + const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete" const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get" const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack" @@ -17,12 +20,22 @@ object AlertingActions { val INDEX_MONITOR_ACTION_TYPE = ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse) @JvmField + val INDEX_WORKFLOW_ACTION_TYPE = + ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse) + @JvmField val GET_ALERTS_ACTION_TYPE = ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse) @JvmField + val GET_WORKFLOW_ACTION_TYPE = + ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse) + + @JvmField val DELETE_MONITOR_ACTION_TYPE = ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse) @JvmField + val DELETE_WORKFLOW_ACTION_TYPE = + ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse) + @JvmField val GET_FINDINGS_ACTION_TYPE = ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse) @JvmField diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt new file mode 100644 index 00000000..cd93b372 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt @@ -0,0 +1,39 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import java.io.IOException + +class DeleteWorkflowRequest : ActionRequest { + + val workflowId: String + val deleteDelegateMonitors: Boolean? + val refreshPolicy: WriteRequest.RefreshPolicy + + constructor(workflowId: String, deleteDelegateMonitors: Boolean?, refreshPolicy: WriteRequest.RefreshPolicy) : super() { + this.workflowId = workflowId + this.deleteDelegateMonitors = deleteDelegateMonitors + this.refreshPolicy = refreshPolicy + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + deleteDelegateMonitors = sin.readOptionalBoolean(), + refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin) + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeOptionalBoolean(deleteDelegateMonitors) + refreshPolicy.writeTo(out) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt new file mode 100644 index 00000000..3aab42ef --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowResponse.kt @@ -0,0 +1,38 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.commons.notifications.action.BaseResponse + +class DeleteWorkflowResponse : BaseResponse { + var id: String + var version: Long + + constructor( + id: String, + version: Long + ) : super() { + this.id = id + this.version = version + } + + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong() // version + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(IndexUtils._ID, id) + .field(IndexUtils._VERSION, version) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt new file mode 100644 index 00000000..435b1deb --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.rest.RestRequest +import org.opensearch.search.fetch.subphase.FetchSourceContext +import java.io.IOException + +class GetWorkflowRequest : ActionRequest { + val workflowId: String + val version: Long + val method: RestRequest.Method + val srcContext: FetchSourceContext? + + constructor( + workflowId: String, + version: Long, + method: RestRequest.Method, + srcContext: FetchSourceContext? + ) : super() { + this.workflowId = workflowId + this.version = version + this.method = method + this.srcContext = srcContext + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // workflowId + sin.readLong(), // version + sin.readEnum(RestRequest.Method::class.java), // method + if (sin.readBoolean()) { + FetchSourceContext(sin) // srcContext + } else null + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeLong(version) + out.writeEnum(method) + out.writeBoolean(srcContext != null) + srcContext?.writeTo(out) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt new file mode 100644 index 00000000..21d0f9c4 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowResponse.kt @@ -0,0 +1,88 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM +import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.notifications.action.BaseResponse +import org.opensearch.rest.RestStatus +import java.io.IOException + +class GetWorkflowResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + private var status: RestStatus + var workflow: Workflow? + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + status: RestStatus, + workflow: Workflow? + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.status = status + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readLong(), // seqNo + sin.readLong(), // primaryTerm + sin.readEnum(RestStatus::class.java), // RestStatus + if (sin.readBoolean()) { + Workflow.readFrom(sin) // monitor + } else null + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + out.writeEnum(status) + if (workflow != null) { + out.writeBoolean(true) + workflow?.writeTo(out) + } else { + out.writeBoolean(false) + } + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + if (workflow != null) + builder.field("workflow", workflow) + + return builder.endObject() + } + + override fun getStatus(): RestStatus { + return this.status + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt new file mode 100644 index 00000000..88fbe3ed --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequest.kt @@ -0,0 +1,64 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.rest.RestRequest +import java.io.IOException + +class IndexWorkflowRequest : ActionRequest { + val workflowId: String + val seqNo: Long + val primaryTerm: Long + val refreshPolicy: WriteRequest.RefreshPolicy + val method: RestRequest.Method + var workflow: Workflow + val rbacRoles: List? + + constructor( + workflowId: String, + seqNo: Long, + primaryTerm: Long, + refreshPolicy: WriteRequest.RefreshPolicy, + method: RestRequest.Method, + workflow: Workflow, + rbacRoles: List? = null + ) : super() { + this.workflowId = workflowId + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.refreshPolicy = refreshPolicy + this.method = method + this.workflow = workflow + this.rbacRoles = rbacRoles + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + workflowId = sin.readString(), + seqNo = sin.readLong(), + primaryTerm = sin.readLong(), + refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin), + method = sin.readEnum(RestRequest.Method::class.java), + workflow = Workflow.readFrom(sin) as Workflow, + rbacRoles = sin.readOptionalStringList() + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(workflowId) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + refreshPolicy.writeTo(out) + out.writeEnum(method) + workflow.writeTo(out) + out.writeOptionalStringCollection(rbacRoles) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt new file mode 100644 index 00000000..15c9f904 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponse.kt @@ -0,0 +1,61 @@ +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.IndexUtils +import org.opensearch.commons.notifications.action.BaseResponse +import java.io.IOException + +class IndexWorkflowResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + var workflow: Workflow + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + workflow: Workflow + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // id + sin.readLong(), // version + sin.readLong(), // seqNo + sin.readLong(), // primaryTerm + Workflow.readFrom(sin) as Workflow // workflow + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + workflow.writeTo(out) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(IndexUtils._ID, id) + .field(IndexUtils._VERSION, version) + .field(IndexUtils._SEQ_NO, seqNo) + .field(IndexUtils._PRIMARY_TERM, primaryTerm) + .field("workflow", workflow) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt new file mode 100644 index 00000000..ac401a75 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt @@ -0,0 +1,76 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.commons.utils.validateId +import java.io.IOException + +/** + * Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id. + */ +data class ChainedFindings( + val monitorId: String +) : BaseModel { + + init { + validateId(monitorId) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // monitorId + ) + + fun asTemplateArg(): Map { + return mapOf( + MONITOR_ID_FIELD to monitorId, + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(MONITOR_ID_FIELD, monitorId) + .endObject() + return builder + } + + companion object { + const val MONITOR_ID_FIELD = "monitor_id" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): ChainedFindings { + lateinit var monitorId: String + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + MONITOR_ID_FIELD -> { + monitorId = xcp.text() + validateId(monitorId) + } + } + } + return ChainedFindings(monitorId) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ChainedFindings { + return ChainedFindings(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt new file mode 100644 index 00000000..3535c90c --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/CompositeInput.kt @@ -0,0 +1,84 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import java.io.IOException + +data class CompositeInput( + val sequence: Sequence +) : WorkflowInput { + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + Sequence(sin) + ) + + fun asTemplateArg(): Map { + return mapOf( + SEQUENCE_FIELD to sequence + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + sequence.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .startObject(COMPOSITE_INPUT_FIELD) + .field(SEQUENCE_FIELD, sequence) + .endObject() + .endObject() + return builder + } + + override fun name(): String { + return COMPOSITE_INPUT_FIELD + } + + fun getMonitorIds(): List { + return sequence.delegates.map { delegate -> delegate.monitorId } + } + + companion object { + const val COMPOSITE_INPUT_FIELD = "composite_input" + const val SEQUENCE_FIELD = "sequence" + + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + WorkflowInput::class.java, + ParseField(COMPOSITE_INPUT_FIELD), CheckedFunction { CompositeInput.parse(it) } + ) + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): CompositeInput { + var sequence = Sequence(emptyList()) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SEQUENCE_FIELD -> { + sequence = Sequence.parse(xcp) + } + } + } + + return CompositeInput(sequence) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): CompositeInput { + return CompositeInput(sin) + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt new file mode 100644 index 00000000..5b258454 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt @@ -0,0 +1,108 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import org.opensearch.commons.utils.validateId +import java.io.IOException + +/** + * Each underlying monitors defined in the composite monitor sequence input. + * They are executed sequentially in the order mentioned. + * Optionally accepts chained findings context. + * */ +data class Delegate( + val order: Int, + val monitorId: String, + val chainedFindings: ChainedFindings? = null +) : BaseModel { + + init { + validateId(monitorId) + validateOrder(order) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + order = sin.readInt(), + monitorId = sin.readString(), + chainedFindings = if (sin.readBoolean()) { + ChainedFindings(sin) + } else null, + ) + + fun asTemplateArg(): Map { + return mapOf( + ORDER_FIELD to order, + MONITOR_ID_FIELD to monitorId, + ) + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeInt(order) + out.writeString(monitorId) + out.writeBoolean(chainedFindings != null) + chainedFindings?.writeTo(out) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(ORDER_FIELD, order) + .field(MONITOR_ID_FIELD, monitorId) + if (chainedFindings != null) { + builder.field(CHAINED_FINDINGS_FIELD, chainedFindings) + } + builder.endObject() + return builder + } + + companion object { + const val ORDER_FIELD = "order" + const val MONITOR_ID_FIELD = "monitor_id" + const val CHAINED_FINDINGS_FIELD = "chained_findings" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Delegate { + lateinit var monitorId: String + var order = 0 + var chainedFindings: ChainedFindings? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + ORDER_FIELD -> { + order = xcp.intValue() + validateOrder(order) + } + MONITOR_ID_FIELD -> { + monitorId = xcp.text() + validateId(monitorId) + } + CHAINED_FINDINGS_FIELD -> { + chainedFindings = ChainedFindings.parse(xcp) + } + } + } + return Delegate(order, monitorId, chainedFindings) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Delegate { + return Delegate(sin) + } + + fun validateOrder(order: Int) { + require(order > 0) { "Invalid delgate order" } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt index 899189b8..4ccaa58f 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt @@ -21,9 +21,29 @@ class Finding( val monitorName: String, val index: String, val docLevelQueries: List, - val timestamp: Instant + val timestamp: Instant, + val executionId: String? = null, ) : Writeable, ToXContent { + constructor( + id: String = NO_ID, + relatedDocIds: List, + monitorId: String, + monitorName: String, + index: String, + docLevelQueries: List, + timestamp: Instant + ) : this ( + id = id, + relatedDocIds = relatedDocIds, + monitorId = monitorId, + monitorName = monitorName, + index = index, + docLevelQueries = docLevelQueries, + timestamp = timestamp, + executionId = null + ) + @Throws(IOException::class) constructor(sin: StreamInput) : this( id = sin.readString(), @@ -32,7 +52,8 @@ class Finding( monitorName = sin.readString(), index = sin.readString(), docLevelQueries = sin.readList((DocLevelQuery)::readFrom), - timestamp = sin.readInstant() + timestamp = sin.readInstant(), + executionId = sin.readOptionalString() ) fun asTemplateArg(): Map { @@ -43,7 +64,8 @@ class Finding( MONITOR_NAME_FIELD to monitorName, INDEX_FIELD to index, QUERIES_FIELD to docLevelQueries, - TIMESTAMP_FIELD to timestamp.toEpochMilli() + TIMESTAMP_FIELD to timestamp.toEpochMilli(), + EXECUTION_ID_FIELD to executionId ) } @@ -56,6 +78,7 @@ class Finding( .field(INDEX_FIELD, index) .field(QUERIES_FIELD, docLevelQueries.toTypedArray()) .field(TIMESTAMP_FIELD, timestamp.toEpochMilli()) + .field(EXECUTION_ID_FIELD, executionId) builder.endObject() return builder } @@ -69,6 +92,7 @@ class Finding( out.writeString(index) out.writeCollection(docLevelQueries) out.writeInstant(timestamp) + out.writeOptionalString(executionId) } companion object { @@ -79,6 +103,7 @@ class Finding( const val INDEX_FIELD = "index" const val QUERIES_FIELD = "queries" const val TIMESTAMP_FIELD = "timestamp" + const val EXECUTION_ID_FIELD = "execution_id" const val NO_ID = "" @JvmStatic @JvmOverloads @@ -91,6 +116,7 @@ class Finding( lateinit var index: String val queries: MutableList = mutableListOf() lateinit var timestamp: Instant + var executionId: String? = null ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -117,6 +143,7 @@ class Finding( TIMESTAMP_FIELD -> { timestamp = requireNotNull(xcp.instant()) } + EXECUTION_ID_FIELD -> executionId = xcp.textOrNull() } } @@ -127,7 +154,8 @@ class Finding( monitorName = monitorName, index = index, docLevelQueries = queries, - timestamp = timestamp + timestamp = timestamp, + executionId = executionId ) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt new file mode 100644 index 00000000..d8ce97a8 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Sequence.kt @@ -0,0 +1,75 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import java.io.IOException + +/** Delegate monitors passed as input for composite monitors. */ +data class Sequence( + val delegates: List +) : BaseModel { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readList(::Delegate) + ) + + fun asTemplateArg(): Map { + return mapOf( + DELEGATES_FIELD to delegates, + ) + } + + companion object { + const val SEQUENCE_FIELD = "sequence" + const val DELEGATES_FIELD = "delegates" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser): Sequence { + val delegates: MutableList = mutableListOf() + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + DELEGATES_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + delegates.add(Delegate.parse(xcp)) + } + } + } + } + return Sequence(delegates) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): DocLevelMonitorInput { + return DocLevelMonitorInput(sin) + } + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeCollection(delegates) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(DELEGATES_FIELD, delegates.toTypedArray()) + .endObject() + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt new file mode 100644 index 00000000..5b446f91 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt @@ -0,0 +1,264 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.CheckedFunction +import org.opensearch.common.ParseField +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION +import org.opensearch.commons.alerting.util.IndexUtils.Companion.WORKFLOW_MAX_INPUTS +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.commons.alerting.util.optionalUserField +import org.opensearch.commons.authuser.User +import java.io.IOException +import java.time.Instant +import java.util.Locale + +data class Workflow( + override val id: String = NO_ID, + override val version: Long = NO_VERSION, + override val name: String, + override val enabled: Boolean, + override val schedule: Schedule, + override val lastUpdateTime: Instant, + override val enabledTime: Instant?, + // TODO: Check how this behaves during rolling upgrade/multi-version cluster + // Can read/write and parsing break if it's done from an old -> new version of the plugin? + val workflowType: WorkflowType, + val user: User?, + val schemaVersion: Int = NO_SCHEMA_VERSION, + val inputs: List, + val owner: String? = "alerting" +) : ScheduledJob { + + override val type = WORKFLOW_TYPE + + init { + if (enabled) { + requireNotNull(enabledTime) + } else { + require(enabledTime == null) + } + require(inputs.size <= WORKFLOW_MAX_INPUTS) { "Workflows can only have $WORKFLOW_MAX_INPUTS search input." } + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + version = sin.readLong(), + name = sin.readString(), + enabled = sin.readBoolean(), + schedule = Schedule.readFrom(sin), + lastUpdateTime = sin.readInstant(), + enabledTime = sin.readOptionalInstant(), + workflowType = sin.readEnum(WorkflowType::class.java), + user = if (sin.readBoolean()) { + User(sin) + } else null, + schemaVersion = sin.readInt(), + inputs = sin.readList((WorkflowInput)::readFrom), + owner = sin.readOptionalString() + ) + + // This enum classifies different workflows + // This is different from 'type' which denotes the Scheduled Job type + enum class WorkflowType(val value: String) { + COMPOSITE("composite"); + + override fun toString(): String { + return value + } + } + + /** Returns a representation of the workflow suitable for passing into painless and mustache scripts. */ + fun asTemplateArg(): Map { + return mapOf(_ID to id, _VERSION to version, NAME_FIELD to name, ENABLED_FIELD to enabled) + } + + fun toXContentWithUser(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return createXContentBuilder(builder, params, false) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return createXContentBuilder(builder, params, true) + } + + private fun createXContentBuilder( + builder: XContentBuilder, + params: ToXContent.Params, + secure: Boolean + ): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(type) + builder.field(TYPE_FIELD, type) + .field(SCHEMA_VERSION_FIELD, schemaVersion) + .field(NAME_FIELD, name) + .field(WORKFLOW_TYPE_FIELD, workflowType) + + if (!secure) { + builder.optionalUserField(USER_FIELD, user) + } + + builder.field(ENABLED_FIELD, enabled) + .optionalTimeField(ENABLED_TIME_FIELD, enabledTime) + .field(SCHEDULE_FIELD, schedule) + .field(INPUTS_FIELD, inputs.toTypedArray()) + .optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime) + builder.field(OWNER_FIELD, owner) + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + override fun fromDocument(id: String, version: Long): Workflow = copy(id = id, version = version) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeString(name) + out.writeBoolean(enabled) + if (schedule is CronSchedule) { + out.writeEnum(Schedule.TYPE.CRON) + } else { + out.writeEnum(Schedule.TYPE.INTERVAL) + } + schedule.writeTo(out) + out.writeInstant(lastUpdateTime) + out.writeOptionalInstant(enabledTime) + out.writeEnum(workflowType) + out.writeBoolean(user != null) + user?.writeTo(out) + out.writeInt(schemaVersion) + // Outputting type with each Input so that the generic Input.readFrom() can read it + out.writeVInt(inputs.size) + inputs.forEach { + if (it is CompositeInput) out.writeEnum(WorkflowInput.Type.COMPOSITE_INPUT) + it.writeTo(out) + } + // Outputting type with each Trigger so that the generic Trigger.readFrom() can read it + out.writeOptionalString(owner) + } + + companion object { + const val WORKFLOW_DELEGATE_PATH = "workflow.inputs.composite_input.sequence.delegates" + const val WORKFLOW_MONITOR_PATH = "workflow.inputs.composite_input.sequence.delegates.monitor_id" + const val WORKFLOW_TYPE = "workflow" + const val TYPE_FIELD = "type" + const val WORKFLOW_TYPE_FIELD = "workflow_type" + const val SCHEMA_VERSION_FIELD = "schema_version" + const val NAME_FIELD = "name" + const val USER_FIELD = "user" + const val ENABLED_FIELD = "enabled" + const val SCHEDULE_FIELD = "schedule" + const val NO_ID = "" + const val NO_VERSION = 1L + const val INPUTS_FIELD = "inputs" + const val LAST_UPDATE_TIME_FIELD = "last_update_time" + const val ENABLED_TIME_FIELD = "enabled_time" + const val OWNER_FIELD = "owner" + + // This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all + // the different subclasses and creating circular dependencies + val XCONTENT_REGISTRY = NamedXContentRegistry.Entry( + ScheduledJob::class.java, + ParseField(WORKFLOW_TYPE), + CheckedFunction { parse(it) } + ) + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Workflow { + var name: String? = null + var workflowType: String = WorkflowType.COMPOSITE.toString() + var user: User? = null + var schedule: Schedule? = null + var lastUpdateTime: Instant? = null + var enabledTime: Instant? = null + var enabled = true + var schemaVersion = NO_SCHEMA_VERSION + val inputs: MutableList = mutableListOf() + var owner = "alerting" + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue() + NAME_FIELD -> name = xcp.text() + WORKFLOW_TYPE_FIELD -> { + workflowType = xcp.text() + val allowedTypes = WorkflowType.values().map { it.value } + if (!allowedTypes.contains(workflowType)) { + throw IllegalStateException("Workflow type should be one of $allowedTypes") + } + } + USER_FIELD -> { + user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp) + } + ENABLED_FIELD -> enabled = xcp.booleanValue() + SCHEDULE_FIELD -> schedule = Schedule.parse(xcp) + INPUTS_FIELD -> { + XContentParserUtils.ensureExpectedToken( + XContentParser.Token.START_ARRAY, + xcp.currentToken(), + xcp + ) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + val input = WorkflowInput.parse(xcp) + inputs.add(input) + } + } + ENABLED_TIME_FIELD -> enabledTime = xcp.instant() + LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant() + OWNER_FIELD -> { + owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text() + } + else -> { + xcp.skipChildren() + } + } + } + + if (enabled && enabledTime == null) { + enabledTime = Instant.now() + } else if (!enabled) { + enabledTime = null + } + return Workflow( + id, + version, + requireNotNull(name) { "Workflow name is null" }, + enabled, + requireNotNull(schedule) { "Workflow schedule is null" }, + lastUpdateTime ?: Instant.now(), + enabledTime, + WorkflowType.valueOf(workflowType.uppercase(Locale.ROOT)), + user, + schemaVersion, + inputs.toList(), + owner + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): Workflow? { + return Workflow(sin) + } + + @Suppress("UNCHECKED_CAST") + fun suppressWarning(map: MutableMap?): MutableMap { + return map as MutableMap + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt new file mode 100644 index 00000000..682271eb --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/WorkflowInput.kt @@ -0,0 +1,48 @@ +package org.opensearch.commons.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.notifications.model.BaseModel +import java.io.IOException + +interface WorkflowInput : BaseModel { + + enum class Type(val value: String) { + COMPOSITE_INPUT(CompositeInput.COMPOSITE_INPUT_FIELD); + + override fun toString(): String { + return value + } + } + + companion object { + + @Throws(IOException::class) + fun parse(xcp: XContentParser): WorkflowInput { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val input = if (xcp.currentName() == Type.COMPOSITE_INPUT.value) { + CompositeInput.parse(xcp) + } else { + throw IllegalStateException("Unexpected input type when reading Input") + } + XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp) + return input + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowInput { + return when (val type = sin.readEnum(Type::class.java)) { + Type.COMPOSITE_INPUT -> CompositeInput(sin) + // This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns + // enum can be null in Java + else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger") + } + } + } + + fun name(): String +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt index eef89a0a..0a6628b8 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/util/IndexUtils.kt @@ -14,6 +14,7 @@ class IndexUtils { const val NO_SCHEMA_VERSION = 0 const val MONITOR_MAX_INPUTS = 1 + const val WORKFLOW_MAX_INPUTS = 1 const val MONITOR_MAX_TRIGGERS = 10 diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index 62e425d9..acce04c1 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -16,15 +16,20 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.RestStatus import org.opensearch.search.SearchModule @@ -55,6 +60,31 @@ internal class AlertingPluginInterfaceTests { Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + @Test + fun indexWorkflow() { + val workflow = randomCompositeWorkflow() + + val request = mock(IndexWorkflowRequest::class.java) + val response = IndexWorkflowResponse( + Workflow.NO_ID, + Workflow.NO_VERSION, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + workflow + ) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.indexWorkflow(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + @Test fun indexBucketMonitor() { val monitor = randomBucketLevelMonitor() @@ -89,6 +119,21 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.deleteMonitor(client, request, listener) } + @Test + fun deleteWorkflow() { + val request = mock(DeleteWorkflowRequest::class.java) + val response = DeleteWorkflowResponse(Workflow.NO_ID, Workflow.NO_VERSION) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + AlertingPluginInterface.deleteWorkflow(client, request, listener) + } + @Test fun getAlerts() { val monitor = randomQueryLevelMonitor() diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 6ab056c5..335f9fe9 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -25,7 +25,10 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -36,7 +39,10 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Sequence import org.opensearch.commons.alerting.model.Trigger +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.WorkflowInput import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope @@ -155,6 +161,50 @@ fun randomDocumentLevelMonitor( ) } +fun randomCompositeWorkflow( + name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + user: User? = randomUser(), + inputs: List? = null, + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = Random().nextBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + var input = inputs + if (input == null) { + input = listOf( + CompositeInput( + Sequence( + listOf(Delegate(1, "delegate1")) + ) + ) + ) + } + return Workflow( + name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input, + schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, + ) +} + +fun Workflow.toJsonStringWithUser(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string() +} + +fun randomSequence( + delegates: List = listOf(randomDelegate()) +): Sequence { + return Sequence(delegates) +} + +fun randomDelegate( + order: Int = 1, + monitorId: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), + chainedFindings: ChainedFindings? = null +): Delegate { + return Delegate(order, monitorId, chainedFindings) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt new file mode 100644 index 00000000..5fefbd7a --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt @@ -0,0 +1,26 @@ +package org.opensearch.commons.alerting.action + +import org.junit.Assert +import org.junit.Test +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput + +class DeleteWorkflowRequestTests { + + @Test + fun `test delete workflow request`() { + + val req = DeleteWorkflowRequest("1234", true, WriteRequest.RefreshPolicy.IMMEDIATE) + Assert.assertNotNull(req) + Assert.assertEquals("1234", req.workflowId) + Assert.assertEquals("true", req.refreshPolicy.value) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = DeleteWorkflowRequest(sin) + Assert.assertEquals("1234", newReq.workflowId) + Assert.assertEquals("true", newReq.refreshPolicy.value) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt new file mode 100644 index 00000000..a3a94166 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowRequestTests.kt @@ -0,0 +1,96 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput +import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.randomCompositeWorkflow +import org.opensearch.commons.utils.recreateObject +import org.opensearch.rest.RestRequest +import org.opensearch.search.SearchModule + +class IndexWorkflowRequestTests { + + @Test + fun `test index workflow post request`() { + + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomCompositeWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowRequest(sin) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `test index composite workflow post request`() { + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomCompositeWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + val newReq = IndexWorkflowRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry)) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.POST, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } + + @Test + fun `Index composite workflow serialize and deserialize transport object should be equal`() { + val compositeWorkflowRequest = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST, + randomCompositeWorkflow() + ) + + val recreatedObject = recreateObject( + compositeWorkflowRequest, + NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables) + ) { IndexWorkflowRequest(it) } + Assertions.assertEquals(compositeWorkflowRequest.workflowId, recreatedObject.workflowId) + Assertions.assertEquals(compositeWorkflowRequest.seqNo, recreatedObject.seqNo) + Assertions.assertEquals(compositeWorkflowRequest.primaryTerm, recreatedObject.primaryTerm) + Assertions.assertEquals(compositeWorkflowRequest.method, recreatedObject.method) + Assertions.assertNotNull(recreatedObject.workflow) + Assertions.assertEquals(compositeWorkflowRequest.workflow, recreatedObject.workflow) + } + + @Test + fun `test index workflow put request`() { + + val req = IndexWorkflowRequest( + "1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT, + randomCompositeWorkflow() + ) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowRequest(sin) + Assertions.assertEquals("1234", newReq.workflowId) + Assertions.assertEquals(1L, newReq.seqNo) + Assertions.assertEquals(2L, newReq.primaryTerm) + Assertions.assertEquals(RestRequest.Method.PUT, newReq.method) + Assertions.assertNotNull(newReq.workflow) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt new file mode 100644 index 00000000..523f5650 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/IndexWorkflowResponseTests.kt @@ -0,0 +1,45 @@ +package org.opensearch.commons.alerting.action + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.commons.alerting.model.CronSchedule +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.randomUser +import java.time.Instant +import java.time.ZoneId + +class IndexWorkflowResponseTests { + + @Test + fun `test index workflow response with workflow`() { + val cronExpression = "31 * * * *" // Run at minute 31. + val testInstance = Instant.ofEpochSecond(1538164858L) + + val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) + val workflow = Workflow( + id = "123", + version = 0L, + name = "test-workflow", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + workflowType = Workflow.WorkflowType.COMPOSITE, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + ) + val req = IndexWorkflowResponse("1234", 1L, 2L, 0L, workflow) + Assertions.assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = IndexWorkflowResponse(sin) + Assertions.assertEquals("1234", newReq.id) + Assertions.assertEquals(1L, newReq.version) + Assertions.assertNotNull(newReq.workflow) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt new file mode 100644 index 00000000..af38ac04 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt @@ -0,0 +1,86 @@ +package org.opensearch.commons.alerting.model + +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.Test +import org.opensearch.commons.alerting.randomDelegate +import org.opensearch.commons.alerting.randomSequence + +class CompositeInputTests { + @Test + fun `test sequence asTemplateArgs`() { + val sequence = randomSequence() + // WHEN + val templateArgs = sequence.asTemplateArg() + + // THEN + val templateDelegates = templateArgs + Assertions.assertEquals( + templateDelegates[Sequence.DELEGATES_FIELD], + sequence.delegates, + "Template args 'id' field does not match:" + ) + } + + @Test + fun `test delegate asTemplateArgs`() { + val delegate = randomDelegate() + // WHEN + val templateArgs = delegate.asTemplateArg() + + // THEN + val templateDelegates = templateArgs + Assertions.assertEquals( + templateDelegates[Delegate.ORDER_FIELD], + delegate.order, + "Template args 'id' field does not match:" + ) + Assertions.assertEquals( + templateDelegates[Delegate.MONITOR_ID_FIELD], + delegate.monitorId, + "Template args 'id' field does not match:" + ) + } + + @Test + fun `test create Delegate with illegal order value`() { + try { + randomDelegate(-1) + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + Assertions.assertEquals( + "Invalid delgate order", + e.message + ) + } + } + + @Test + fun `test create Delegate with illegal monitorId value`() { + try { + randomDelegate(1, "") + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("Invalid characters in id") + + ) + } + } + } + + @Test + fun `test create Chained Findings with illegal monitorId value`() { + try { + ChainedFindings("") + Assertions.fail("Expecting an illegal argument exception") + } catch (e: IllegalArgumentException) { + e.message?.let { + Assertions.assertTrue( + it.contains("Invalid characters in id") + + ) + } + } + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt index aa3b6e6f..ac07c1c3 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/XContentTests.kt @@ -17,6 +17,7 @@ import org.opensearch.commons.alerting.randomActionWithPolicy import org.opensearch.commons.alerting.randomAlert import org.opensearch.commons.alerting.randomBucketLevelMonitor import org.opensearch.commons.alerting.randomBucketLevelTrigger +import org.opensearch.commons.alerting.randomCompositeWorkflow import org.opensearch.commons.alerting.randomQueryLevelMonitor import org.opensearch.commons.alerting.randomQueryLevelMonitorWithoutUser import org.opensearch.commons.alerting.randomQueryLevelTrigger @@ -159,6 +160,14 @@ class XContentTests { Assertions.assertEquals(monitor, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") } + @Test + fun `test composite workflow parsing`() { + val workflow = randomCompositeWorkflow() + val monitorString = workflow.toJsonStringWithUser() + val parsedMonitor = Workflow.parse(parser(monitorString)) + Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work") + } + @Test fun `test query-level trigger parsing`() { val trigger = randomQueryLevelTrigger() From a2a2ede32da619cfce5900432ad2c0329d98d142 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Wed, 15 Mar 2023 20:13:49 +0100 Subject: [PATCH 2/4] Removed property from workflow. Added comments and aligned code according to conversation on the PR Signed-off-by: Stevan Buzejic --- .../alerting/action/GetWorkflowRequest.kt | 13 ++---------- ...dFindings.kt => ChainedMonitorFindings.kt} | 11 +++++----- .../commons/alerting/model/Delegate.kt | 20 +++++++++---------- .../commons/alerting/model/Workflow.kt | 7 ++++--- .../commons/alerting/TestHelpers.kt | 6 +++--- .../alerting/model/CompositeInputTests.kt | 2 +- 6 files changed, 26 insertions(+), 33 deletions(-) rename src/main/kotlin/org/opensearch/commons/alerting/model/{ChainedFindings.kt => ChainedMonitorFindings.kt} (83%) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt index 435b1deb..fd0ba157 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt @@ -10,35 +10,28 @@ import org.opensearch.action.ActionRequestValidationException import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.rest.RestRequest -import org.opensearch.search.fetch.subphase.FetchSourceContext import java.io.IOException class GetWorkflowRequest : ActionRequest { val workflowId: String val version: Long val method: RestRequest.Method - val srcContext: FetchSourceContext? constructor( workflowId: String, version: Long, - method: RestRequest.Method, - srcContext: FetchSourceContext? + method: RestRequest.Method ) : super() { this.workflowId = workflowId this.version = version this.method = method - this.srcContext = srcContext } @Throws(IOException::class) constructor(sin: StreamInput) : this( sin.readString(), // workflowId sin.readLong(), // version - sin.readEnum(RestRequest.Method::class.java), // method - if (sin.readBoolean()) { - FetchSourceContext(sin) // srcContext - } else null + sin.readEnum(RestRequest.Method::class.java) // method ) override fun validate(): ActionRequestValidationException? { @@ -50,7 +43,5 @@ class GetWorkflowRequest : ActionRequest { out.writeString(workflowId) out.writeLong(version) out.writeEnum(method) - out.writeBoolean(srcContext != null) - srcContext?.writeTo(out) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt similarity index 83% rename from src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt rename to src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt index ac401a75..689096dd 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedFindings.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/ChainedMonitorFindings.kt @@ -13,7 +13,8 @@ import java.io.IOException /** * Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id. */ -data class ChainedFindings( +// TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties +data class ChainedMonitorFindings( val monitorId: String ) : BaseModel { @@ -49,7 +50,7 @@ data class ChainedFindings( @JvmStatic @Throws(IOException::class) - fun parse(xcp: XContentParser): ChainedFindings { + fun parse(xcp: XContentParser): ChainedMonitorFindings { lateinit var monitorId: String XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) @@ -64,13 +65,13 @@ data class ChainedFindings( } } } - return ChainedFindings(monitorId) + return ChainedMonitorFindings(monitorId) } @JvmStatic @Throws(IOException::class) - fun readFrom(sin: StreamInput): ChainedFindings { - return ChainedFindings(sin) + fun readFrom(sin: StreamInput): ChainedMonitorFindings { + return ChainedMonitorFindings(sin) } } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt index 5b258454..592bdc1d 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt @@ -18,7 +18,7 @@ import java.io.IOException data class Delegate( val order: Int, val monitorId: String, - val chainedFindings: ChainedFindings? = null + val chainedMonitorFindings: ChainedMonitorFindings? = null ) : BaseModel { init { @@ -30,8 +30,8 @@ data class Delegate( constructor(sin: StreamInput) : this( order = sin.readInt(), monitorId = sin.readString(), - chainedFindings = if (sin.readBoolean()) { - ChainedFindings(sin) + chainedMonitorFindings = if (sin.readBoolean()) { + ChainedMonitorFindings(sin) } else null, ) @@ -46,16 +46,16 @@ data class Delegate( override fun writeTo(out: StreamOutput) { out.writeInt(order) out.writeString(monitorId) - out.writeBoolean(chainedFindings != null) - chainedFindings?.writeTo(out) + out.writeBoolean(chainedMonitorFindings != null) + chainedMonitorFindings?.writeTo(out) } override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.startObject() .field(ORDER_FIELD, order) .field(MONITOR_ID_FIELD, monitorId) - if (chainedFindings != null) { - builder.field(CHAINED_FINDINGS_FIELD, chainedFindings) + if (chainedMonitorFindings != null) { + builder.field(CHAINED_FINDINGS_FIELD, chainedMonitorFindings) } builder.endObject() return builder @@ -71,7 +71,7 @@ data class Delegate( fun parse(xcp: XContentParser): Delegate { lateinit var monitorId: String var order = 0 - var chainedFindings: ChainedFindings? = null + var chainedMonitorFindings: ChainedMonitorFindings? = null XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -88,11 +88,11 @@ data class Delegate( validateId(monitorId) } CHAINED_FINDINGS_FIELD -> { - chainedFindings = ChainedFindings.parse(xcp) + chainedMonitorFindings = ChainedMonitorFindings.parse(xcp) } } } - return Delegate(order, monitorId, chainedFindings) + return Delegate(order, monitorId, chainedMonitorFindings) } @JvmStatic diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt index 5b446f91..a4235fb4 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt @@ -35,9 +35,8 @@ data class Workflow( val user: User?, val schemaVersion: Int = NO_SCHEMA_VERSION, val inputs: List, - val owner: String? = "alerting" + val owner: String? = DEFAULT_OWNER ) : ScheduledJob { - override val type = WORKFLOW_TYPE init { @@ -185,7 +184,7 @@ data class Workflow( var enabled = true var schemaVersion = NO_SCHEMA_VERSION val inputs: MutableList = mutableListOf() - var owner = "alerting" + var owner = DEFAULT_OWNER XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { @@ -260,5 +259,7 @@ data class Workflow( fun suppressWarning(map: MutableMap?): MutableMap { return map as MutableMap } + + private const val DEFAULT_OWNER = "alerting" } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt index 335f9fe9..6129b1b4 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt @@ -25,7 +25,7 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger -import org.opensearch.commons.alerting.model.ChainedFindings +import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.Delegate @@ -200,9 +200,9 @@ fun randomSequence( fun randomDelegate( order: Int = 1, monitorId: String = RandomStrings.randomAsciiLettersOfLength(Random(), 10), - chainedFindings: ChainedFindings? = null + chainedMonitorFindings: ChainedMonitorFindings? = null ): Delegate { - return Delegate(order, monitorId, chainedFindings) + return Delegate(order, monitorId, chainedMonitorFindings) } fun randomQueryLevelTrigger( diff --git a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt index af38ac04..9680bdbe 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/model/CompositeInputTests.kt @@ -72,7 +72,7 @@ class CompositeInputTests { @Test fun `test create Chained Findings with illegal monitorId value`() { try { - ChainedFindings("") + ChainedMonitorFindings("") Assertions.fail("Expecting an illegal argument exception") } catch (e: IllegalArgumentException) { e.message?.let { From b66c0dcc1d3f38393fa5f0a34f69755f569c7770 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Thu, 16 Mar 2023 18:52:57 +0100 Subject: [PATCH 3/4] Added java docs for workflow related fields and classes. Removed unused properties Signed-off-by: Stevan Buzejic --- .../alerting/action/DeleteWorkflowRequest.kt | 13 ++++++------- .../commons/alerting/action/GetWorkflowRequest.kt | 5 ----- .../opensearch/commons/alerting/model/Delegate.kt | 10 ++++++++++ .../opensearch/commons/alerting/model/Finding.kt | 4 ++++ .../alerting/action/DeleteWorkflowRequestTests.kt | 5 +---- 5 files changed, 21 insertions(+), 16 deletions(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt index cd93b372..4990f497 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequest.kt @@ -2,7 +2,6 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException -import org.opensearch.action.support.WriteRequest import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import java.io.IOException @@ -10,20 +9,21 @@ import java.io.IOException class DeleteWorkflowRequest : ActionRequest { val workflowId: String + /** + * Flag that indicates whether the delegate monitors should be deleted or not. + * If the flag is set to true, Delegate monitors will be deleted only in the case when they are part of the specified workflow and no other. + */ val deleteDelegateMonitors: Boolean? - val refreshPolicy: WriteRequest.RefreshPolicy - constructor(workflowId: String, deleteDelegateMonitors: Boolean?, refreshPolicy: WriteRequest.RefreshPolicy) : super() { + constructor(workflowId: String, deleteDelegateMonitors: Boolean?) : super() { this.workflowId = workflowId this.deleteDelegateMonitors = deleteDelegateMonitors - this.refreshPolicy = refreshPolicy } @Throws(IOException::class) constructor(sin: StreamInput) : this( workflowId = sin.readString(), - deleteDelegateMonitors = sin.readOptionalBoolean(), - refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin) + deleteDelegateMonitors = sin.readOptionalBoolean() ) override fun validate(): ActionRequestValidationException? { @@ -34,6 +34,5 @@ class DeleteWorkflowRequest : ActionRequest { override fun writeTo(out: StreamOutput) { out.writeString(workflowId) out.writeOptionalBoolean(deleteDelegateMonitors) - refreshPolicy.writeTo(out) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt index fd0ba157..1b7948cd 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetWorkflowRequest.kt @@ -14,23 +14,19 @@ import java.io.IOException class GetWorkflowRequest : ActionRequest { val workflowId: String - val version: Long val method: RestRequest.Method constructor( workflowId: String, - version: Long, method: RestRequest.Method ) : super() { this.workflowId = workflowId - this.version = version this.method = method } @Throws(IOException::class) constructor(sin: StreamInput) : this( sin.readString(), // workflowId - sin.readLong(), // version sin.readEnum(RestRequest.Method::class.java) // method ) @@ -41,7 +37,6 @@ class GetWorkflowRequest : ActionRequest { @Throws(IOException::class) override fun writeTo(out: StreamOutput) { out.writeString(workflowId) - out.writeLong(version) out.writeEnum(method) } } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt index 592bdc1d..a446409e 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Delegate.kt @@ -16,8 +16,18 @@ import java.io.IOException * Optionally accepts chained findings context. * */ data class Delegate( + /** + * Defines the order of the monitor in delegate list + */ val order: Int, + /** + * Id of the monitor + */ val monitorId: String, + /** + * Keeps the track of the previously executed monitor in a chain list. + * Used for pre-filtering by getting the findings doc ids for the given monitor + */ val chainedMonitorFindings: ChainedMonitorFindings? = null ) : BaseModel { diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt index 4ccaa58f..913ef5b4 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt @@ -22,6 +22,10 @@ class Finding( val index: String, val docLevelQueries: List, val timestamp: Instant, + /** + * Keeps the track of the workflow-monitor exact execution. + * Used for filtering the data in chained findings. + */ val executionId: String? = null, ) : Writeable, ToXContent { diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt index 5fefbd7a..20bcb27e 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/DeleteWorkflowRequestTests.kt @@ -2,7 +2,6 @@ package org.opensearch.commons.alerting.action import org.junit.Assert import org.junit.Test -import org.opensearch.action.support.WriteRequest import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput @@ -11,16 +10,14 @@ class DeleteWorkflowRequestTests { @Test fun `test delete workflow request`() { - val req = DeleteWorkflowRequest("1234", true, WriteRequest.RefreshPolicy.IMMEDIATE) + val req = DeleteWorkflowRequest("1234", true) Assert.assertNotNull(req) Assert.assertEquals("1234", req.workflowId) - Assert.assertEquals("true", req.refreshPolicy.value) val out = BytesStreamOutput() req.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newReq = DeleteWorkflowRequest(sin) Assert.assertEquals("1234", newReq.workflowId) - Assert.assertEquals("true", newReq.refreshPolicy.value) } } From 6e2f32ff643be5767ad2a18bda0f8ce500d0e15d Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Thu, 16 Mar 2023 19:09:30 +0100 Subject: [PATCH 4/4] Updated finding comment Signed-off-by: Stevan Buzejic --- .../kotlin/org/opensearch/commons/alerting/model/Finding.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt index 913ef5b4..6881fd79 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt @@ -24,7 +24,7 @@ class Finding( val timestamp: Instant, /** * Keeps the track of the workflow-monitor exact execution. - * Used for filtering the data in chained findings. + * Used for filtering the data when chaining monitors in a workflow. */ val executionId: String? = null, ) : Writeable, ToXContent {