Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rest layer for workflows (Backport #963) #965

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand Down Expand Up @@ -127,7 +130,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")

@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"

@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"

@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
Expand Down Expand Up @@ -171,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
RestIndexWorkflowAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestAcknowledgeAlertAction(),
Expand All @@ -181,7 +185,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetFindingsAction()
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestToXContentListener
import java.io.IOException

/**
* This class consists of the REST handler to delete workflows.
*/
class RestDeleteWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "delete_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.DELETE,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
val deleteDelegateMonitors = request.paramAsBoolean("deleteDelegateMonitors", false)
log.debug("${request.method()} ${request.uri()}")

val refreshPolicy =
WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value))
val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, deleteDelegateMonitors)

return RestChannelConsumer { channel ->
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest,
RestToXContentListener(channel)
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.context
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestToXContentListener
import org.opensearch.search.fetch.subphase.FetchSourceContext

/**
* This class consists of the REST handler to retrieve a workflow .
*/
class RestGetWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "get_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.GET,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
if (workflowId == null || workflowId.isEmpty()) {
throw IllegalArgumentException("missing id")
}

var srcContext = context(request)
if (request.method() == RestRequest.Method.HEAD) {
srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
}
val getWorkflowRequest =
GetWorkflowRequest(workflowId, request.method())
return RestChannelConsumer {
channel ->
client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.alerting.resthandler

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestChannel
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestResponse
import org.opensearch.rest.RestStatus
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant

/**
* Rest handlers to create and update workflows.
*/
class RestIndexWorkflowAction : BaseRestHandler() {

override fun getName(): String {
return "index_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI),
RestHandler.Route(
RestRequest.Method.PUT,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("workflowID", Workflow.NO_ID)
if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) {
throw AlertingException.wrap(IllegalArgumentException("Missing workflow ID"))
}

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now())
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}
val workflowRequest =
IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles)

return RestChannelConsumer { channel ->
client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method()))
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener<IndexWorkflowResponse> {
return object : RestResponseListener<IndexWorkflowResponse>(channel) {
@Throws(Exception::class)
override fun buildResponse(response: IndexWorkflowResponse): RestResponse {
var returnStatus = RestStatus.CREATED
if (restMethod == RestRequest.Method.PUT)
returnStatus = RestStatus.OK

val restResponse =
BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (returnStatus == RestStatus.CREATED) {
val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}"
restResponse.addHeader("Location", location)
}
return restResponse
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"required": true
},
"_meta" : {
"schema_version": 4
"schema_version": 5
},
"properties": {
"schema_version": {
Expand All @@ -25,6 +25,9 @@
"severity": {
"type": "keyword"
},
"execution_id": {
"type": "keyword"
},
"monitor_name": {
"type": "text",
"fields": {
Expand Down
13 changes: 12 additions & 1 deletion alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.alerting

import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.commons.alerting.action.AlertingActions

val ALL_ACCESS_ROLE = "all_access"
val READALL_AND_MONITOR_ROLE = "readall_and_monitor"
val ALERTING_FULL_ACCESS_ROLE = "alerting_full_access"
Expand All @@ -16,11 +19,15 @@ val ALERTING_GET_EMAIL_GROUP_ACCESS = "alerting_get_email_group_access"
val ALERTING_SEARCH_EMAIL_GROUP_ACCESS = "alerting_search_email_group_access"
val ALERTING_INDEX_MONITOR_ACCESS = "alerting_index_monitor_access"
val ALERTING_GET_MONITOR_ACCESS = "alerting_get_monitor_access"
val ALERTING_GET_WORKFLOW_ACCESS = "alerting_get_workflow_access"
val ALERTING_DELETE_WORKFLOW_ACCESS = "alerting_delete_workflow_access"
val ALERTING_SEARCH_MONITOR_ONLY_ACCESS = "alerting_search_monitor_access"
val ALERTING_EXECUTE_MONITOR_ACCESS = "alerting_execute_monitor_access"
val ALERTING_EXECUTE_WORKFLOW_ACCESS = "alerting_execute_workflow_access"
val ALERTING_DELETE_MONITOR_ACCESS = "alerting_delete_monitor_access"
val ALERTING_GET_DESTINATION_ACCESS = "alerting_get_destination_access"
val ALERTING_GET_ALERTS_ACCESS = "alerting_get_alerts_access"
val ALERTING_INDEX_WORKFLOW_ACCESS = "alerting_index_workflow_access"

val ROLE_TO_PERMISSION_MAPPING = mapOf(
ALERTING_NO_ACCESS_ROLE to "",
Expand All @@ -30,9 +37,13 @@ val ROLE_TO_PERMISSION_MAPPING = mapOf(
ALERTING_SEARCH_EMAIL_GROUP_ACCESS to "cluster:admin/opendistro/alerting/destination/email_group/search",
ALERTING_INDEX_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/write",
ALERTING_GET_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/get",
ALERTING_GET_WORKFLOW_ACCESS to AlertingActions.GET_WORKFLOW_ACTION_NAME,
ALERTING_SEARCH_MONITOR_ONLY_ACCESS to "cluster:admin/opendistro/alerting/monitor/search",
ALERTING_EXECUTE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/execute",
ALERTING_EXECUTE_WORKFLOW_ACCESS to ExecuteWorkflowAction.NAME,
ALERTING_DELETE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/delete",
ALERTING_GET_DESTINATION_ACCESS to "cluster:admin/opendistro/alerting/destination/get",
ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get"
ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get",
ALERTING_INDEX_WORKFLOW_ACCESS to AlertingActions.INDEX_WORKFLOW_ACTION_NAME,
ALERTING_DELETE_WORKFLOW_ACCESS to AlertingActions.DELETE_WORKFLOW_ACTION_NAME
)
Loading