Skip to content

Commit

Permalink
Added rest layer for the workflow. (#963)
Browse files Browse the repository at this point in the history
* Added rest layer for the workflow. Added secure tests (#886)

* Added rest layer for the workflow. Added secure tests

* add execution_id field in alert mapping
---------

Signed-off-by: Stevan Buzejic <[email protected]>
Signed-off-by: Surya Sashank Nistala <[email protected]>
Co-authored-by: Stevan Buzejic <[email protected]>
  • Loading branch information
eirsep and stevanbz committed Jun 21, 2023
1 parent 0df3e33 commit e88921f
Show file tree
Hide file tree
Showing 11 changed files with 2,896 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction
import org.opensearch.alerting.resthandler.RestExecuteMonitorAction
import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
import org.opensearch.alerting.resthandler.RestIndexWorkflowAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
Expand Down Expand Up @@ -127,7 +130,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")

@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"

@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"

@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
Expand Down Expand Up @@ -171,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetMonitorAction(),
RestDeleteMonitorAction(),
RestIndexMonitorAction(),
RestIndexWorkflowAction(),
RestSearchMonitorAction(settings, clusterService),
RestExecuteMonitorAction(),
RestAcknowledgeAlertAction(),
Expand All @@ -181,7 +185,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction(),
RestGetFindingsAction()
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestToXContentListener
import java.io.IOException

/**
* This class consists of the REST handler to delete workflows.
*/
class RestDeleteWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "delete_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.DELETE,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
val deleteDelegateMonitors = request.paramAsBoolean("deleteDelegateMonitors", false)
log.debug("${request.method()} ${request.uri()}")

val refreshPolicy =
WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value))
val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, deleteDelegateMonitors)

return RestChannelConsumer { channel ->
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest,
RestToXContentListener(channel)
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.context
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.action.RestToXContentListener
import org.opensearch.search.fetch.subphase.FetchSourceContext

/**
* This class consists of the REST handler to retrieve a workflow .
*/
class RestGetWorkflowAction : BaseRestHandler() {

private val log = LogManager.getLogger(javaClass)

override fun getName(): String {
return "get_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(
RestRequest.Method.GET,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}")

val workflowId = request.param("workflowID")
if (workflowId == null || workflowId.isEmpty()) {
throw IllegalArgumentException("missing id")
}

var srcContext = context(request)
if (request.method() == RestRequest.Method.HEAD) {
srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE
}
val getWorkflowRequest =
GetWorkflowRequest(workflowId, request.method())
return RestChannelConsumer {
channel ->
client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/
package org.opensearch.alerting.resthandler

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BaseRestHandler.RestChannelConsumer
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestChannel
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestResponse
import org.opensearch.rest.RestStatus
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant

/**
* Rest handlers to create and update workflows.
*/
class RestIndexWorkflowAction : BaseRestHandler() {

override fun getName(): String {
return "index_workflow_action"
}

override fun routes(): List<RestHandler.Route> {
return listOf(
RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI),
RestHandler.Route(
RestRequest.Method.PUT,
"${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val id = request.param("workflowID", Workflow.NO_ID)
if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) {
throw AlertingException.wrap(IllegalArgumentException("Missing workflow ID"))
}

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now())
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM)
val refreshPolicy = if (request.hasParam(REFRESH)) {
WriteRequest.RefreshPolicy.parse(request.param(REFRESH))
} else {
WriteRequest.RefreshPolicy.IMMEDIATE
}
val workflowRequest =
IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles)

return RestChannelConsumer { channel ->
client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method()))
}
}

private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener<IndexWorkflowResponse> {
return object : RestResponseListener<IndexWorkflowResponse>(channel) {
@Throws(Exception::class)
override fun buildResponse(response: IndexWorkflowResponse): RestResponse {
var returnStatus = RestStatus.CREATED
if (restMethod == RestRequest.Method.PUT)
returnStatus = RestStatus.OK

val restResponse =
BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (returnStatus == RestStatus.CREATED) {
val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}"
restResponse.addHeader("Location", location)
}
return restResponse
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"required": true
},
"_meta" : {
"schema_version": 4
"schema_version": 5
},
"properties": {
"schema_version": {
Expand All @@ -25,6 +25,9 @@
"severity": {
"type": "keyword"
},
"execution_id": {
"type": "keyword"
},
"monitor_name": {
"type": "text",
"fields": {
Expand Down
13 changes: 12 additions & 1 deletion alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.alerting

import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.commons.alerting.action.AlertingActions

val ALL_ACCESS_ROLE = "all_access"
val READALL_AND_MONITOR_ROLE = "readall_and_monitor"
val ALERTING_FULL_ACCESS_ROLE = "alerting_full_access"
Expand All @@ -16,11 +19,15 @@ val ALERTING_GET_EMAIL_GROUP_ACCESS = "alerting_get_email_group_access"
val ALERTING_SEARCH_EMAIL_GROUP_ACCESS = "alerting_search_email_group_access"
val ALERTING_INDEX_MONITOR_ACCESS = "alerting_index_monitor_access"
val ALERTING_GET_MONITOR_ACCESS = "alerting_get_monitor_access"
val ALERTING_GET_WORKFLOW_ACCESS = "alerting_get_workflow_access"
val ALERTING_DELETE_WORKFLOW_ACCESS = "alerting_delete_workflow_access"
val ALERTING_SEARCH_MONITOR_ONLY_ACCESS = "alerting_search_monitor_access"
val ALERTING_EXECUTE_MONITOR_ACCESS = "alerting_execute_monitor_access"
val ALERTING_EXECUTE_WORKFLOW_ACCESS = "alerting_execute_workflow_access"
val ALERTING_DELETE_MONITOR_ACCESS = "alerting_delete_monitor_access"
val ALERTING_GET_DESTINATION_ACCESS = "alerting_get_destination_access"
val ALERTING_GET_ALERTS_ACCESS = "alerting_get_alerts_access"
val ALERTING_INDEX_WORKFLOW_ACCESS = "alerting_index_workflow_access"

val ROLE_TO_PERMISSION_MAPPING = mapOf(
ALERTING_NO_ACCESS_ROLE to "",
Expand All @@ -30,9 +37,13 @@ val ROLE_TO_PERMISSION_MAPPING = mapOf(
ALERTING_SEARCH_EMAIL_GROUP_ACCESS to "cluster:admin/opendistro/alerting/destination/email_group/search",
ALERTING_INDEX_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/write",
ALERTING_GET_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/get",
ALERTING_GET_WORKFLOW_ACCESS to AlertingActions.GET_WORKFLOW_ACTION_NAME,
ALERTING_SEARCH_MONITOR_ONLY_ACCESS to "cluster:admin/opendistro/alerting/monitor/search",
ALERTING_EXECUTE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/execute",
ALERTING_EXECUTE_WORKFLOW_ACCESS to ExecuteWorkflowAction.NAME,
ALERTING_DELETE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/delete",
ALERTING_GET_DESTINATION_ACCESS to "cluster:admin/opendistro/alerting/destination/get",
ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get"
ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get",
ALERTING_INDEX_WORKFLOW_ACCESS to AlertingActions.INDEX_WORKFLOW_ACTION_NAME,
ALERTING_DELETE_WORKFLOW_ACCESS to AlertingActions.DELETE_WORKFLOW_ACTION_NAME
)
Loading

0 comments on commit e88921f

Please sign in to comment.