Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added workflow as a composite monitor #380

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.commons.alerting.action.GetWorkflowResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.commons.utils.recreateObject

Expand Down Expand Up @@ -53,6 +59,7 @@ object AlertingPluginInterface {
}
)
}

fun deleteMonitor(
client: NodeClient,
request: DeleteMonitorRequest,
Expand All @@ -71,6 +78,49 @@ object AlertingPluginInterface {
)
}

/**
* Index monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param namedWriteableRegistry Registry for building aggregations
* @param listener The listener for getting response
*/
fun indexWorkflow(
client: NodeClient,
request: IndexWorkflowRequest,
listener: ActionListener<IndexWorkflowResponse>
) {
client.execute(
AlertingActions.INDEX_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
IndexWorkflowResponse(
it
)
}
}
)
}

fun deleteWorkflow(
client: NodeClient,
request: DeleteWorkflowRequest,
listener: ActionListener<DeleteWorkflowResponse>
) {
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
DeleteWorkflowResponse(
it
)
}
}
)
}

/**
* Get Alerts interface.
* @param client Node client for making transport action
Expand All @@ -95,6 +145,30 @@ object AlertingPluginInterface {
)
}

/**
* Get Workflow interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun getWorkflow(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also add execute workflow

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could - but then we need to extract the ExecuteWorkflowRequest.
Should I add a execute workflow method here or?

client: NodeClient,
request: GetWorkflowRequest,
listener: ActionListener<GetWorkflowResponse>
) {
client.execute(
AlertingActions.GET_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
GetWorkflowResponse(
it
)
}
}
)
}

/**
* Get Findings interface.
* @param client Node client for making transport action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,34 @@ import org.opensearch.action.ActionType

object AlertingActions {
const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write"
const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write"
const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get"
const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get"
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete"
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"

@JvmField
val INDEX_MONITOR_ACTION_TYPE =
ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse)
@JvmField
val INDEX_WORKFLOW_ACTION_TYPE =
ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse)
@JvmField
val GET_ALERTS_ACTION_TYPE =
ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse)
@JvmField
val GET_WORKFLOW_ACTION_TYPE =
ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse)

@JvmField
val DELETE_MONITOR_ACTION_TYPE =
ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse)
@JvmField
val DELETE_WORKFLOW_ACTION_TYPE =
ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse)
@JvmField
val GET_FINDINGS_ACTION_TYPE =
ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse)
@JvmField
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import java.io.IOException

class DeleteWorkflowRequest : ActionRequest {

val workflowId: String
val deleteDelegateMonitors: Boolean?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java docs

val refreshPolicy: WriteRequest.RefreshPolicy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove refresh policy
we shoudl alway set it as IMMEDIATE and not take input from user


constructor(workflowId: String, deleteDelegateMonitors: Boolean?, refreshPolicy: WriteRequest.RefreshPolicy) : super() {
this.workflowId = workflowId
this.deleteDelegateMonitors = deleteDelegateMonitors
this.refreshPolicy = refreshPolicy
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
workflowId = sin.readString(),
deleteDelegateMonitors = sin.readOptionalBoolean(),
refreshPolicy = WriteRequest.RefreshPolicy.readFrom(sin)
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeOptionalBoolean(deleteDelegateMonitors)
refreshPolicy.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.commons.alerting.util.IndexUtils
import org.opensearch.commons.notifications.action.BaseResponse

class DeleteWorkflowResponse : BaseResponse {
var id: String
var version: Long

constructor(
id: String,
version: Long
) : super() {
this.id = id
this.version = version
}

constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readLong() // version
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(IndexUtils._ID, id)
.field(IndexUtils._VERSION, version)
.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.rest.RestRequest
import org.opensearch.search.fetch.subphase.FetchSourceContext
import java.io.IOException

class GetWorkflowRequest : ActionRequest {
val workflowId: String
val version: Long
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove version from request. User should be able to fetch by id

val method: RestRequest.Method
val srcContext: FetchSourceContext?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why FetchSourceContext type and not a specific data model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh - it's copy/paste from GetMonitorRequest


constructor(
workflowId: String,
version: Long,
method: RestRequest.Method,
srcContext: FetchSourceContext?
) : super() {
this.workflowId = workflowId
this.version = version
this.method = method
this.srcContext = srcContext
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // workflowId
sin.readLong(), // version
sin.readEnum(RestRequest.Method::class.java), // method
if (sin.readBoolean()) {
FetchSourceContext(sin) // srcContext
} else null
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeLong(version)
out.writeEnum(method)
out.writeBoolean(srcContext != null)
srcContext?.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM
import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.rest.RestStatus
import java.io.IOException

class GetWorkflowResponse : BaseResponse {
var id: String
var version: Long
var seqNo: Long
var primaryTerm: Long
private var status: RestStatus
var workflow: Workflow?

constructor(
id: String,
version: Long,
seqNo: Long,
primaryTerm: Long,
status: RestStatus,
workflow: Workflow?
) : super() {
this.id = id
this.version = version
this.seqNo = seqNo
this.primaryTerm = primaryTerm
this.status = status
this.workflow = workflow
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readLong(), // version
sin.readLong(), // seqNo
sin.readLong(), // primaryTerm
sin.readEnum(RestStatus::class.java), // RestStatus
if (sin.readBoolean()) {
Workflow.readFrom(sin) // monitor
} else null
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
out.writeEnum(status)
if (workflow != null) {
out.writeBoolean(true)
workflow?.writeTo(out)
} else {
out.writeBoolean(false)
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (workflow != null)
builder.field("workflow", workflow)

return builder.endObject()
}

override fun getStatus(): RestStatus {
return this.status
}
}
Loading