Skip to content

Commit

Permalink
Added Alerting workflow model (opensearch-project#380)
Browse files Browse the repository at this point in the history
Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored and eirsep committed May 24, 2023
1 parent db60532 commit 5abb5c7
Show file tree
Hide file tree
Showing 23 changed files with 1,476 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.commons.alerting.action.GetWorkflowResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.commons.utils.recreateObject

Expand Down Expand Up @@ -55,6 +61,7 @@ object AlertingPluginInterface {
}
)
}

fun deleteMonitor(
client: NodeClient,
request: DeleteMonitorRequest,
Expand All @@ -73,6 +80,49 @@ object AlertingPluginInterface {
)
}

/**
* Index monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param namedWriteableRegistry Registry for building aggregations
* @param listener The listener for getting response
*/
fun indexWorkflow(
client: NodeClient,
request: IndexWorkflowRequest,
listener: ActionListener<IndexWorkflowResponse>
) {
client.execute(
AlertingActions.INDEX_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
IndexWorkflowResponse(
it
)
}
}
)
}

fun deleteWorkflow(
client: NodeClient,
request: DeleteWorkflowRequest,
listener: ActionListener<DeleteWorkflowResponse>
) {
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
DeleteWorkflowResponse(
it
)
}
}
)
}

/**
* Get Alerts interface.
* @param client Node client for making transport action
Expand All @@ -97,6 +147,30 @@ object AlertingPluginInterface {
)
}

/**
* Get Workflow interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun getWorkflow(
client: NodeClient,
request: GetWorkflowRequest,
listener: ActionListener<GetWorkflowResponse>
) {
client.execute(
AlertingActions.GET_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
GetWorkflowResponse(
it
)
}
}
)
}

/**
* Get Findings interface.
* @param client Node client for making transport action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import org.opensearch.action.ActionType

object AlertingActions {
const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write"
const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write"
const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get"
const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get"
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete"
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
Expand All @@ -19,14 +22,24 @@ object AlertingActions {
ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse)

@JvmField
val INDEX_WORKFLOW_ACTION_TYPE =
ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse)
@JvmField
val GET_ALERTS_ACTION_TYPE =
ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse)

@JvmField
val GET_WORKFLOW_ACTION_TYPE =
ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse)

@JvmField
val DELETE_MONITOR_ACTION_TYPE =
ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse)

@JvmField
val DELETE_WORKFLOW_ACTION_TYPE =
ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse)
@JvmField
val GET_FINDINGS_ACTION_TYPE =
ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import java.io.IOException

class DeleteWorkflowRequest : ActionRequest {

val workflowId: String
/**
* Flag that indicates whether the delegate monitors should be deleted or not.
* If the flag is set to true, Delegate monitors will be deleted only in the case when they are part of the specified workflow and no other.
*/
val deleteDelegateMonitors: Boolean?

constructor(workflowId: String, deleteDelegateMonitors: Boolean?) : super() {
this.workflowId = workflowId
this.deleteDelegateMonitors = deleteDelegateMonitors
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
workflowId = sin.readString(),
deleteDelegateMonitors = sin.readOptionalBoolean()
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeOptionalBoolean(deleteDelegateMonitors)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.commons.alerting.util.IndexUtils
import org.opensearch.commons.notifications.action.BaseResponse

class DeleteWorkflowResponse : BaseResponse {
var id: String
var version: Long

constructor(
id: String,
version: Long
) : super() {
this.id = id
this.version = version
}

constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readLong() // version
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(IndexUtils._ID, id)
.field(IndexUtils._VERSION, version)
.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.rest.RestRequest
import java.io.IOException

class GetWorkflowRequest : ActionRequest {
val workflowId: String
val method: RestRequest.Method

constructor(
workflowId: String,
method: RestRequest.Method
) : super() {
this.workflowId = workflowId
this.method = method
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // workflowId
sin.readEnum(RestRequest.Method::class.java) // method
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeEnum(method)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM
import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.rest.RestStatus
import java.io.IOException

class GetWorkflowResponse : BaseResponse {
var id: String
var version: Long
var seqNo: Long
var primaryTerm: Long
private var status: RestStatus
var workflow: Workflow?

constructor(
id: String,
version: Long,
seqNo: Long,
primaryTerm: Long,
status: RestStatus,
workflow: Workflow?
) : super() {
this.id = id
this.version = version
this.seqNo = seqNo
this.primaryTerm = primaryTerm
this.status = status
this.workflow = workflow
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readLong(), // version
sin.readLong(), // seqNo
sin.readLong(), // primaryTerm
sin.readEnum(RestStatus::class.java), // RestStatus
if (sin.readBoolean()) {
Workflow.readFrom(sin) // monitor
} else null
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
out.writeEnum(status)
if (workflow != null) {
out.writeBoolean(true)
workflow?.writeTo(out)
} else {
out.writeBoolean(false)
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (workflow != null)
builder.field("workflow", workflow)

return builder.endObject()
}

override fun getStatus(): RestStatus {
return this.status
}
}
Loading

0 comments on commit 5abb5c7

Please sign in to comment.