diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index babd25938..f63406d77 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -27,6 +27,7 @@ import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSetting import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestDeleteMonitorAction +import org.opensearch.alerting.resthandler.RestDeleteWorkflowAction import org.opensearch.alerting.resthandler.RestExecuteMonitorAction import org.opensearch.alerting.resthandler.RestGetAlertsAction import org.opensearch.alerting.resthandler.RestGetDestinationsAction @@ -34,7 +35,9 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction import org.opensearch.alerting.resthandler.RestGetEmailGroupAction import org.opensearch.alerting.resthandler.RestGetFindingsAction import org.opensearch.alerting.resthandler.RestGetMonitorAction +import org.opensearch.alerting.resthandler.RestGetWorkflowAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction +import org.opensearch.alerting.resthandler.RestIndexWorkflowAction import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction @@ -127,7 +130,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}") @JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors" - + @JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" @JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" @JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" @@ -171,6 +174,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetMonitorAction(), RestDeleteMonitorAction(), RestIndexMonitorAction(), + RestIndexWorkflowAction(), RestSearchMonitorAction(settings, clusterService), RestExecuteMonitorAction(), RestAcknowledgeAlertAction(), @@ -181,7 +185,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetEmailGroupAction(), RestGetDestinationsAction(), RestGetAlertsAction(), - RestGetFindingsAction() + RestGetFindingsAction(), + RestGetWorkflowAction(), + RestDeleteWorkflowAction() ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt new file mode 100644 index 000000000..a61a9b51c --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestDeleteWorkflowAction.kt @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.REFRESH +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.io.IOException + +/** + * This class consists of the REST handler to delete workflows. + */ +class RestDeleteWorkflowAction : BaseRestHandler() { + + private val log = LogManager.getLogger(javaClass) + + override fun getName(): String { + return "delete_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route( + RestRequest.Method.DELETE, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}") + + val workflowId = request.param("workflowID") + val deleteDelegateMonitors = request.paramAsBoolean("deleteDelegateMonitors", false) + log.debug("${request.method()} ${request.uri()}") + + val refreshPolicy = + WriteRequest.RefreshPolicy.parse(request.param(REFRESH, WriteRequest.RefreshPolicy.IMMEDIATE.value)) + val deleteWorkflowRequest = DeleteWorkflowRequest(workflowId, deleteDelegateMonitors) + + return RestChannelConsumer { channel -> + client.execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, deleteWorkflowRequest, + RestToXContentListener(channel) + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt new file mode 100644 index 000000000..1a2ca4426 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetWorkflowAction.kt @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.context +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import org.opensearch.search.fetch.subphase.FetchSourceContext + +/** + * This class consists of the REST handler to retrieve a workflow . + */ +class RestGetWorkflowAction : BaseRestHandler() { + + private val log = LogManager.getLogger(javaClass) + + override fun getName(): String { + return "get_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route( + RestRequest.Method.GET, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}") + + val workflowId = request.param("workflowID") + if (workflowId == null || workflowId.isEmpty()) { + throw IllegalArgumentException("missing id") + } + + var srcContext = context(request) + if (request.method() == RestRequest.Method.HEAD) { + srcContext = FetchSourceContext.DO_NOT_FETCH_SOURCE + } + val getWorkflowRequest = + GetWorkflowRequest(workflowId, request.method()) + return RestChannelConsumer { + channel -> + client.execute(AlertingActions.GET_WORKFLOW_ACTION_TYPE, getWorkflowRequest, RestToXContentListener(channel)) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt new file mode 100644 index 000000000..faa0ae065 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexWorkflowAction.kt @@ -0,0 +1,99 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ +package org.opensearch.alerting.resthandler + +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IF_PRIMARY_TERM +import org.opensearch.alerting.util.IF_SEQ_NO +import org.opensearch.alerting.util.REFRESH +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BaseRestHandler.RestChannelConsumer +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestChannel +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestResponse +import org.opensearch.rest.RestStatus +import org.opensearch.rest.action.RestResponseListener +import java.io.IOException +import java.time.Instant + +/** + * Rest handlers to create and update workflows. + */ +class RestIndexWorkflowAction : BaseRestHandler() { + + override fun getName(): String { + return "index_workflow_action" + } + + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.POST, AlertingPlugin.WORKFLOW_BASE_URI), + RestHandler.Route( + RestRequest.Method.PUT, + "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val id = request.param("workflowID", Workflow.NO_ID) + if (request.method() == RestRequest.Method.PUT && Workflow.NO_ID == id) { + throw AlertingException.wrap(IllegalArgumentException("Missing workflow ID")) + } + + // Validate request by parsing JSON to Monitor + val xcp = request.contentParser() + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val workflow = Workflow.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + val rbacRoles = request.contentParser().map()["rbac_roles"] as List? + + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) + val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + val refreshPolicy = if (request.hasParam(REFRESH)) { + WriteRequest.RefreshPolicy.parse(request.param(REFRESH)) + } else { + WriteRequest.RefreshPolicy.IMMEDIATE + } + val workflowRequest = + IndexWorkflowRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), workflow, rbacRoles) + + return RestChannelConsumer { channel -> + client.execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, workflowRequest, indexMonitorResponse(channel, request.method())) + } + } + + private fun indexMonitorResponse(channel: RestChannel, restMethod: RestRequest.Method): RestResponseListener { + return object : RestResponseListener(channel) { + @Throws(Exception::class) + override fun buildResponse(response: IndexWorkflowResponse): RestResponse { + var returnStatus = RestStatus.CREATED + if (restMethod == RestRequest.Method.PUT) + returnStatus = RestStatus.OK + + val restResponse = + BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)) + if (returnStatus == RestStatus.CREATED) { + val location = "${AlertingPlugin.WORKFLOW_BASE_URI}/${response.id}" + restResponse.addHeader("Location", location) + } + return restResponse + } + } + } +} diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index fcb1d1c94..4cc60f233 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -4,7 +4,7 @@ "required": true }, "_meta" : { - "schema_version": 4 + "schema_version": 5 }, "properties": { "schema_version": { @@ -25,6 +25,9 @@ "severity": { "type": "keyword" }, + "execution_id": { + "type": "keyword" + }, "monitor_name": { "type": "text", "fields": { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt index d3b73779c..7f415a8ac 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AccessRoles.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting +import org.opensearch.alerting.action.ExecuteWorkflowAction +import org.opensearch.commons.alerting.action.AlertingActions + val ALL_ACCESS_ROLE = "all_access" val READALL_AND_MONITOR_ROLE = "readall_and_monitor" val ALERTING_FULL_ACCESS_ROLE = "alerting_full_access" @@ -16,11 +19,15 @@ val ALERTING_GET_EMAIL_GROUP_ACCESS = "alerting_get_email_group_access" val ALERTING_SEARCH_EMAIL_GROUP_ACCESS = "alerting_search_email_group_access" val ALERTING_INDEX_MONITOR_ACCESS = "alerting_index_monitor_access" val ALERTING_GET_MONITOR_ACCESS = "alerting_get_monitor_access" +val ALERTING_GET_WORKFLOW_ACCESS = "alerting_get_workflow_access" +val ALERTING_DELETE_WORKFLOW_ACCESS = "alerting_delete_workflow_access" val ALERTING_SEARCH_MONITOR_ONLY_ACCESS = "alerting_search_monitor_access" val ALERTING_EXECUTE_MONITOR_ACCESS = "alerting_execute_monitor_access" +val ALERTING_EXECUTE_WORKFLOW_ACCESS = "alerting_execute_workflow_access" val ALERTING_DELETE_MONITOR_ACCESS = "alerting_delete_monitor_access" val ALERTING_GET_DESTINATION_ACCESS = "alerting_get_destination_access" val ALERTING_GET_ALERTS_ACCESS = "alerting_get_alerts_access" +val ALERTING_INDEX_WORKFLOW_ACCESS = "alerting_index_workflow_access" val ROLE_TO_PERMISSION_MAPPING = mapOf( ALERTING_NO_ACCESS_ROLE to "", @@ -30,9 +37,13 @@ val ROLE_TO_PERMISSION_MAPPING = mapOf( ALERTING_SEARCH_EMAIL_GROUP_ACCESS to "cluster:admin/opendistro/alerting/destination/email_group/search", ALERTING_INDEX_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/write", ALERTING_GET_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/get", + ALERTING_GET_WORKFLOW_ACCESS to AlertingActions.GET_WORKFLOW_ACTION_NAME, ALERTING_SEARCH_MONITOR_ONLY_ACCESS to "cluster:admin/opendistro/alerting/monitor/search", ALERTING_EXECUTE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/execute", + ALERTING_EXECUTE_WORKFLOW_ACCESS to ExecuteWorkflowAction.NAME, ALERTING_DELETE_MONITOR_ACCESS to "cluster:admin/opendistro/alerting/monitor/delete", ALERTING_GET_DESTINATION_ACCESS to "cluster:admin/opendistro/alerting/destination/get", - ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get" + ALERTING_GET_ALERTS_ACCESS to "cluster:admin/opendistro/alerting/alerts/get", + ALERTING_INDEX_WORKFLOW_ACCESS to AlertingActions.INDEX_WORKFLOW_ACTION_NAME, + ALERTING_DELETE_WORKFLOW_ACCESS to AlertingActions.DELETE_WORKFLOW_ACTION_NAME ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 3e676af40..3c79ad683 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -55,6 +55,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.alerting.util.string import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent @@ -70,6 +71,7 @@ import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.Locale import java.util.UUID +import java.util.stream.Collectors import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory @@ -158,6 +160,34 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response } + protected fun deleteWorkflow(workflow: Workflow, deleteDelegates: Boolean = false, refresh: Boolean = true): Response { + val response = client().makeRequest( + "DELETE", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}?refresh=$refresh&deleteDelegateMonitors=$deleteDelegates", + emptyMap(), + workflow.toHttpEntity() + ) + assertEquals("Unable to delete a workflow", RestStatus.OK, response.restStatus()) + return response + } + + protected fun deleteWorkflowWithClient( + client: RestClient, + workflow: Workflow, + deleteDelegates: Boolean = false, + refresh: Boolean = true + ): Response { + val response = client.makeRequest( + "DELETE", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}?refresh=$refresh&deleteDelegateMonitors=$deleteDelegates", + emptyMap(), + workflow.toHttpEntity() + ) + assertEquals("Unable to delete a workflow", RestStatus.OK, response.restStatus()) + + return response + } + /** * Destinations are now deprecated in favor of the Notification plugin's configs. * This method should only be used for checking legacy behavior/Notification migration scenarios. @@ -531,6 +561,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return getMonitor(monitorId = monitor.id) } + @Suppress("UNCHECKED_CAST") + protected fun updateWorkflow(workflow: Workflow, refresh: Boolean = false): Workflow { + val response = client().makeRequest( + "PUT", + "${workflow.relativeUrl()}?refresh=$refresh", + emptyMap(), + workflow.toHttpEntity() + ) + assertEquals("Unable to update a workflow", RestStatus.OK, response.restStatus()) + assertUserNull(response.asMap()["workflow"] as Map) + return getWorkflow(workflowId = workflow.id) + } + protected fun updateMonitorWithClient( client: RestClient, monitor: Monitor, @@ -548,6 +591,23 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return getMonitor(monitorId = monitor.id) } + protected fun updateWorkflowWithClient( + client: RestClient, + workflow: Workflow, + rbacRoles: List = emptyList(), + refresh: Boolean = true + ): Workflow { + val response = client.makeRequest( + "PUT", + "${workflow.relativeUrl()}?refresh=$refresh", + emptyMap(), + createWorkflowEntityWithBackendRoles(workflow, rbacRoles) + ) + assertEquals("Unable to update a workflow", RestStatus.OK, response.restStatus()) + assertUserNull(response.asMap()["workflow"] as Map) + return getWorkflow(workflowId = workflow.id) + } + protected fun getMonitor(monitorId: String, header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json")): Monitor { val response = client().makeRequest("GET", "$ALERTING_BASE_URI/$monitorId", null, header) assertEquals("Unable to get monitor $monitorId", RestStatus.OK, response.restStatus()) @@ -721,10 +781,18 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return executeMonitor(client(), monitorId, params) } + protected fun executeWorkflow(workflowId: String, params: Map = mutableMapOf()): Response { + return executeWorkflow(client(), workflowId, params) + } + protected fun executeMonitor(client: RestClient, monitorId: String, params: Map = mutableMapOf()): Response { return client.makeRequest("POST", "$ALERTING_BASE_URI/$monitorId/_execute", params) } + protected fun executeWorkflow(client: RestClient, workflowId: String, params: Map = mutableMapOf()): Response { + return client.makeRequest("POST", "$WORKFLOW_ALERTING_BASE_URI/$workflowId/_execute", params) + } + protected fun executeMonitor(monitor: Monitor, params: Map = mapOf()): Response { return executeMonitor(client(), monitor, params) } @@ -1208,6 +1276,37 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { client().performRequest(request) } + private fun createCustomIndexRole(name: String, index: String, clusterPermissions: List) { + val request = Request("PUT", "/_plugins/_security/api/roles/$name") + + val clusterPermissionsStr = + clusterPermissions.stream().map { p: String? -> "\"" + p + "\"" }.collect( + Collectors.joining(",") + ) + + var entity = "{\n" + + "\"cluster_permissions\": [\n" + + "$clusterPermissionsStr\n" + + "],\n" + + "\"index_permissions\": [\n" + + "{\n" + + "\"index_patterns\": [\n" + + "\"$index\"\n" + + "],\n" + + "\"dls\": \"\",\n" + + "\"fls\": [],\n" + + "\"masked_fields\": [],\n" + + "\"allowed_actions\": [\n" + + "\"crud\"\n" + + "]\n" + + "}\n" + + "],\n" + + "\"tenant_permissions\": []\n" + + "}" + request.setJsonEntity(entity) + client().performRequest(request) + } + fun createIndexRoleWithDocLevelSecurity(name: String, index: String, dlsQuery: String, clusterPermissions: String? = "") { val request = Request("PUT", "/_plugins/_security/api/roles/$name") var entity = "{\n" + @@ -1233,6 +1332,36 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { client().performRequest(request) } + fun createIndexRoleWithDocLevelSecurity(name: String, index: String, dlsQuery: String, clusterPermissions: List) { + val clusterPermissionsStr = + clusterPermissions.stream().map { p: String -> "\"" + getClusterPermissionsFromCustomRole(p) + "\"" }.collect( + Collectors.joining(",") + ) + + val request = Request("PUT", "/_plugins/_security/api/roles/$name") + var entity = "{\n" + + "\"cluster_permissions\": [\n" + + "$clusterPermissionsStr\n" + + "],\n" + + "\"index_permissions\": [\n" + + "{\n" + + "\"index_patterns\": [\n" + + "\"$index\"\n" + + "],\n" + + "\"dls\": \"$dlsQuery\",\n" + + "\"fls\": [],\n" + + "\"masked_fields\": [],\n" + + "\"allowed_actions\": [\n" + + "\"crud\"\n" + + "]\n" + + "}\n" + + "],\n" + + "\"tenant_permissions\": []\n" + + "}" + request.setJsonEntity(entity) + client().performRequest(request) + } + fun createUserRolesMapping(role: String, users: Array) { val request = Request("PUT", "/_plugins/_security/api/rolesmapping/$role") val usersStr = users.joinToString { it -> "\"$it\"" } @@ -1298,6 +1427,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { createUserRolesMapping(role, arrayOf(user)) } + fun createUserWithTestDataAndCustomRole( + user: String, + index: String, + role: String, + backendRoles: List, + clusterPermissions: List + ) { + createUser(user, backendRoles.toTypedArray()) + createTestIndex(index) + createCustomIndexRole(role, index, clusterPermissions) + createUserRolesMapping(role, arrayOf(user)) + } + fun createUserWithRoles( user: String, roles: List, @@ -1385,4 +1527,83 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { } } } + + protected fun createRandomWorkflow(monitorIds: List, refresh: Boolean = false): Workflow { + val workflow = randomWorkflow(monitorIds = monitorIds) + return createWorkflow(workflow, refresh) + } + + private fun createWorkflowEntityWithBackendRoles(workflow: Workflow, rbacRoles: List?): HttpEntity { + if (rbacRoles == null) { + return workflow.toHttpEntity() + } + val temp = workflow.toJsonString() + val toReplace = temp.lastIndexOf("}") + val rbacString = rbacRoles.joinToString { "\"$it\"" } + val jsonString = temp.substring(0, toReplace) + ", \"rbac_roles\": [$rbacString] }" + return StringEntity(jsonString, ContentType.APPLICATION_JSON) + } + + protected fun createWorkflowWithClient( + client: RestClient, + workflow: Workflow, + rbacRoles: List? = null, + refresh: Boolean = true + ): Workflow { + val response = client.makeRequest( + "POST", "$WORKFLOW_ALERTING_BASE_URI?refresh=$refresh", emptyMap(), + createWorkflowEntityWithBackendRoles(workflow, rbacRoles) + ) + assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus()) + + val workflowJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, + response.entity.content + ).map() + assertUserNull(workflowJson as HashMap) + return workflow.copy(id = workflowJson["_id"] as String) + } + + protected fun createWorkflow(workflow: Workflow, refresh: Boolean = true): Workflow { + return createWorkflowWithClient(client(), workflow, emptyList(), refresh) + } + + protected fun Workflow.toHttpEntity(): HttpEntity { + return StringEntity(toJsonString(), APPLICATION_JSON) + } + + private fun Workflow.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return shuffleXContent(toXContent(builder, ToXContent.EMPTY_PARAMS)).string() + } + + protected fun getWorkflow( + workflowId: String, + header: BasicHeader = BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json"), + ): Workflow { + val response = client().makeRequest("GET", "$WORKFLOW_ALERTING_BASE_URI/$workflowId", null, header) + assertEquals("Unable to get workflow $workflowId", RestStatus.OK, response.restStatus()) + + val parser = createParser(XContentType.JSON.xContent(), response.entity.content) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser) + + lateinit var id: String + var version: Long = 0 + lateinit var workflow: Workflow + + while (parser.nextToken() != XContentParser.Token.END_OBJECT) { + parser.nextToken() + + when (parser.currentName()) { + "_id" -> id = parser.text() + "_version" -> version = parser.longValue() + "workflow" -> workflow = Workflow.parse(parser) + } + } + + assertUserNull(workflow) + return workflow.copy(id = id, version = version) + } + + protected fun Workflow.relativeUrl() = "$WORKFLOW_ALERTING_BASE_URI/$id" } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 0eed1a1d6..b72c08865 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -254,7 +254,8 @@ fun randomWorkflow( user = user, inputs = listOf(CompositeInput(Sequence(delegates))), version = -1L, - schemaVersion = 0 + schemaVersion = 0, + triggers = listOf() ) } @@ -279,7 +280,8 @@ fun randomWorkflowWithDelegates( user = user, inputs = listOf(CompositeInput(Sequence(delegates))), version = -1L, - schemaVersion = 0 + schemaVersion = 0, + triggers = emptyList() ) } @@ -391,6 +393,7 @@ fun randomScript(source: String = "return " + OpenSearchRestTestCase.randomBoole val ADMIN = "admin" val ALERTING_BASE_URI = "/_plugins/_alerting/monitors" +val WORKFLOW_ALERTING_BASE_URI = "/_plugins/_alerting/workflows" val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" val LEGACY_OPENDISTRO_ALERTING_BASE_URI = "/_opendistro/_alerting/monitors" val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index bdfc23513..4ce7bd2be 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -54,7 +54,7 @@ class AlertIndicesIT : AlertingRestTestCase() { putAlertMappings( AlertIndices.alertMapping().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 4", "\"schema_version\": 0") + .replace("\"schema_version\": 5", "\"schema_version\": 0") ) assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) @@ -65,8 +65,8 @@ class AlertIndicesIT : AlertingRestTestCase() { assertIndexExists(AlertIndices.ALERT_INDEX) assertIndexExists(AlertIndices.ALERT_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 7) - verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 4) - verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 4) + verifyIndexSchemaVersion(AlertIndices.ALERT_INDEX, 5) + verifyIndexSchemaVersion(AlertIndices.ALERT_HISTORY_WRITE_INDEX, 5) } fun `test update finding index mapping with new schema version`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt new file mode 100644 index 000000000..df9c236fa --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt @@ -0,0 +1,1418 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.http.HttpHeaders +import org.apache.http.entity.ContentType +import org.apache.http.message.BasicHeader +import org.apache.http.nio.entity.NStringEntity +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.opensearch.alerting.ALERTING_BASE_URI +import org.opensearch.alerting.ALERTING_DELETE_WORKFLOW_ACCESS +import org.opensearch.alerting.ALERTING_EXECUTE_WORKFLOW_ACCESS +import org.opensearch.alerting.ALERTING_FULL_ACCESS_ROLE +import org.opensearch.alerting.ALERTING_GET_WORKFLOW_ACCESS +import org.opensearch.alerting.ALERTING_INDEX_MONITOR_ACCESS +import org.opensearch.alerting.ALERTING_INDEX_WORKFLOW_ACCESS +import org.opensearch.alerting.ALERTING_NO_ACCESS_ROLE +import org.opensearch.alerting.ALERTING_READ_ONLY_ACCESS +import org.opensearch.alerting.ALWAYS_RUN +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.READALL_AND_MONITOR_ROLE +import org.opensearch.alerting.TERM_DLS_QUERY +import org.opensearch.alerting.TEST_HR_BACKEND_ROLE +import org.opensearch.alerting.TEST_HR_INDEX +import org.opensearch.alerting.TEST_HR_ROLE +import org.opensearch.alerting.TEST_NON_HR_INDEX +import org.opensearch.alerting.WORKFLOW_ALERTING_BASE_URI +import org.opensearch.alerting.assertUserNull +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.randomBucketLevelMonitor +import org.opensearch.alerting.randomBucketLevelTrigger +import org.opensearch.alerting.randomDocLevelQuery +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomWorkflow +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.client.RestClient +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentType +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.rest.SecureRestClientBuilder +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.script.Script +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.junit.annotations.TestLogging +import java.time.Instant + +@TestLogging("level:DEBUG", reason = "Debug for tests.") +@Suppress("UNCHECKED_CAST") +class SecureWorkflowRestApiIT : AlertingRestTestCase() { + + companion object { + + @BeforeClass + @JvmStatic + fun setup() { + // things to execute once and keep around for the class + org.junit.Assume.assumeTrue(System.getProperty("security", "false")!!.toBoolean()) + } + } + + val user = "userD" + var userClient: RestClient? = null + + @Before + fun create() { + if (userClient == null) { + createUser(user, arrayOf()) + userClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), user, user).setSocketTimeout(60000).build() + } + } + + @After + fun cleanup() { + userClient?.close() + deleteUser(user) + } + + // Create Workflow related security tests + fun `test create workflow with an user with alerting role`() { + val clusterPermissions = listOf( + getClusterPermissionsFromCustomRole(ALERTING_INDEX_WORKFLOW_ACCESS) + ) + + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + clusterPermissions + ) + try { + val monitor = createMonitor( + randomQueryLevelMonitor( + inputs = listOf(SearchInput(listOf(TEST_HR_INDEX), SearchSourceBuilder().query(QueryBuilders.matchAllQuery()))), + ), + true + ) + + val workflow = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + + val createResponse = userClient?.makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse?.restStatus()) + + assertUserNull(createResponse?.asMap()!!["workflow"] as HashMap) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test create workflow with an user without alerting role`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_NO_ACCESS_ROLE) + ) + try { + val monitor = createRandomMonitor(true) + + val workflow = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + + userClient?.makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + fail("Expected 403 Method FORBIDDEN response") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test create workflow with an user with read-only role`() { + createUserWithTestData(user, TEST_HR_INDEX, TEST_HR_ROLE, TEST_HR_BACKEND_ROLE) + createUserRolesMapping(ALERTING_READ_ONLY_ACCESS, arrayOf(user)) + + try { + val monitor = createRandomMonitor(true) + val workflow = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + userClient?.makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + fail("Expected 403 Method FORBIDDEN response") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteRoleMapping(ALERTING_READ_ONLY_ACCESS) + } + } + + fun `test create workflow with delegate with an user without index read role`() { + createTestIndex(TEST_NON_HR_INDEX) + val clusterPermissions = listOf( + getClusterPermissionsFromCustomRole(ALERTING_INDEX_WORKFLOW_ACCESS) + ) + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + clusterPermissions + ) + try { + val query = randomDocLevelQuery(tags = listOf()) + val triggers = listOf(randomDocumentLevelTrigger(condition = Script("query[id=\"${query.id}\"]"))) + + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf( + DocLevelMonitorInput( + indices = listOf(TEST_NON_HR_INDEX), + queries = listOf(query) + ) + ), + triggers = triggers + ), + true + ) + + val workflow = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + + userClient?.makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteIndex(TEST_NON_HR_INDEX) + } + } + + fun `test create workflow with disable filter by`() { + disableFilterBy() + val monitor = createRandomMonitor(true) + val workflow = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + assertUserNull(createResponse.asMap()["workflow"] as HashMap) + } + + fun `test get workflow with an user with get workflow role`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + + val monitor = createRandomMonitor(true) + val workflow = createWorkflow(randomWorkflow(monitorIds = listOf(monitor.id))) + + try { + val getWorkflowResponse = userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + /* + TODO: https://github.com/opensearch-project/alerting/issues/300 + */ + fun `test get workflow with an user without get monitor role`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_NO_ACCESS_ROLE) + ) + + val monitor = createRandomMonitor(true) + val workflow = createWorkflow(randomWorkflow(monitorIds = listOf(monitor.id))) + + try { + userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected 403 Method FORBIDDEN response") + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.FORBIDDEN, e.response.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun getDocs(response: Response?): Any? { + val hits = createParser( + XContentType.JSON.xContent(), + response?.entity?.content + ).map()["hits"]!! as Map> + return hits["total"]?.get("value") + } + + // Query Monitors related security tests + fun `test update workflow with disable filter by`() { + disableFilterBy() + + val createdMonitor = createMonitor(monitor = randomQueryLevelMonitor(enabled = true)) + val createdWorkflow = createWorkflow( + randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true, enabledTime = Instant.now()) + ) + + assertNotNull("The workflow was not created", createdWorkflow) + assertTrue("The workflow was not enabled", createdWorkflow.enabled) + + val workflowV2 = createdWorkflow.copy(enabled = false, enabledTime = null) + val updatedWorkflow = updateWorkflow(workflowV2) + + assertFalse("The monitor was not disabled", updatedWorkflow.enabled) + } + + fun `test update workflow with enable filter by`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + + val createdMonitor = createMonitorWithClient( + client = client(), + monitor = randomQueryLevelMonitor(enabled = true), + rbacRoles = listOf("admin") + ) + val createdWorkflow = createWorkflow( + randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true, enabledTime = Instant.now()) + ) + + assertNotNull("The workflow was not created", createdWorkflow) + assertTrue("The workflow was not enabled", createdWorkflow.enabled) + + val workflowV2 = createdWorkflow.copy(enabled = false, enabledTime = null) + val updatedWorkflow = updateWorkflow(workflow = workflowV2) + + assertFalse("The monitor was not disabled", updatedWorkflow.enabled) + } + + fun `test create workflow with enable filter by with a user have access and without role has no access`() { + enableFilterBy() + if (!isHttps()) { + return + } + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient( + userClient!!, + monitor = randomQueryLevelMonitor(enabled = true), + listOf(TEST_HR_BACKEND_ROLE, "role2") + ) + + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflowWithClient( + userClient!!, + workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true), + listOf(TEST_HR_BACKEND_ROLE, "role2") + ) + assertNotNull("The workflow was not created", createdWorkflow) + + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + + // getUser should have access to the monitor + val getUser = "getUser" + createUserWithTestDataAndCustomRole( + getUser, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf("role2"), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) + .setSocketTimeout(60000).build() + + val getWorkflowResponse = getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + + // Remove backend role and ensure no access is granted after + patchUserBackendRoles(getUser, arrayOf("role1")) + try { + getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteUser(getUser) + getUserClient?.close() + } + } + + fun `test create workflow with enable filter by with a user with a backend role doesn't have access to monitor`() { + enableFilterBy() + if (!isHttps()) { + return + } + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient( + userClient!!, + monitor = randomQueryLevelMonitor(enabled = true), + listOf("role2") + ) + + assertNotNull("The monitor was not created", createdMonitor) + + val userWithDifferentRole = "role3User" + + createUserWithRoles( + userWithDifferentRole, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role3"), + false + ) + + val userWithDifferentRoleClient = SecureRestClientBuilder( + clusterHosts.toTypedArray(), isHttps(), userWithDifferentRole, userWithDifferentRole + ) + .setSocketTimeout(60000).build() + + try { + createWorkflowWithClient( + userWithDifferentRoleClient!!, + workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true), + listOf("role3") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Create workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteUser(userWithDifferentRole) + userWithDifferentRoleClient?.close() + } + } + + fun `test create workflow with enable filter by with no backend roles`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = createMonitor(randomQueryLevelMonitor(enabled = true)) + + val workflow = randomWorkflow(monitorIds = listOf(monitor.id)) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + try { + createWorkflowWithClient(userClient!!, workflow, listOf()) + fail("Expected exception since a non-admin user is trying to create a workflow with no backend roles") + } catch (e: ResponseException) { + assertEquals("Create workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test create workflow as admin with enable filter by with no backend roles`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitor(monitor = monitor) + val createdWorkflow = createWorkflow(randomWorkflow(monitorIds = listOf(createdMonitor.id))) + assertNotNull("The workflow was not created", createdWorkflow) + + try { + + userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test create workflow with enable filter by with roles user has no access and throw exception`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = createMonitor(randomQueryLevelMonitor(enabled = true)) + val workflow = randomWorkflow(monitorIds = listOf(monitor.id)) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + try { + createWorkflowWithClient(userClient!!, workflow = workflow, listOf(TEST_HR_BACKEND_ROLE, "role1", "role2")) + fail("Expected create workflow to fail as user does not have role1 backend role") + } catch (e: ResponseException) { + assertEquals("Create workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test create workflow as admin with enable filter by with a user have access and without role has no access`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + + val createdMonitor = createMonitorWithClient(client(), monitor = monitor, listOf(TEST_HR_BACKEND_ROLE, "role1", "role2")) + val createdWorkflow = createWorkflowWithClient( + client(), + randomWorkflow(monitorIds = listOf(createdMonitor.id)), + listOf(TEST_HR_BACKEND_ROLE, "role1", "role2") + ) + assertNotNull("The workflow was not created", createdWorkflow) + + // user should have access to the admin monitor + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + + val getWorkflowResponse = userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + + // Remove good backend role and ensure no access is granted after + patchUserBackendRoles(user, arrayOf("role5")) + try { + userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test update workflow with enable filter by with removing a permission`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient(userClient!!, randomQueryLevelMonitor(), listOf(TEST_HR_BACKEND_ROLE, "role2")) + val createdWorkflow = createWorkflowWithClient( + client = userClient!!, workflow = randomWorkflow(enabled = true, monitorIds = listOf(createdMonitor.id)), + rbacRoles = listOf(TEST_HR_BACKEND_ROLE, "role2") + ) + assertNotNull("The workflow was not created", createdWorkflow) + + // getUser should have access to the monitor + val getUser = "getUser" + createUserWithTestDataAndCustomRole( + getUser, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf("role2"), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) + .setSocketTimeout(60000).build() + + val getWorkflowResponse = getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + + // Remove backend role from monitor + val updatedWorkflow = updateWorkflowWithClient(userClient!!, createdWorkflow, listOf(TEST_HR_BACKEND_ROLE)) + + // getUser should no longer have access + try { + getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${updatedWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get monitor failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteUser(getUser) + getUserClient?.close() + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test update workflow with enable filter by with no backend roles`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient(userClient!!, monitor = monitor, listOf("role2")) + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflowWithClient( + userClient!!, + randomWorkflow(monitorIds = listOf(createdMonitor.id)), + listOf("role2") + ) + + assertNotNull("The workflow was not created", createdWorkflow) + + try { + updateWorkflowWithClient(userClient!!, createdWorkflow, listOf()) + } catch (e: ResponseException) { + assertEquals("Update monitor failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test update workflow as admin with enable filter by with no backend roles`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + val createdMonitorResponse = createMonitor(monitor, true) + assertNotNull("The monitor was not created", createdMonitorResponse) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + val workflow = randomWorkflow( + monitorIds = listOf(createdMonitorResponse.id) + ) + + val createdWorkflow = createWorkflowWithClient( + client(), + workflow = workflow, + rbacRoles = listOf(TEST_HR_BACKEND_ROLE) + ) + + assertNotNull("The workflow was not created", createdWorkflow) + + val getWorkflowResponse = userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + + val updatedWorkflow = updateWorkflowWithClient(client(), createdWorkflow, listOf()) + + try { + userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${updatedWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test update workflow with enable filter by with updating with a permission user has no access to and throw exception`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient(userClient!!, monitor = monitor, listOf(TEST_HR_BACKEND_ROLE, "role2")) + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflowWithClient( + userClient!!, + workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id)), listOf(TEST_HR_BACKEND_ROLE, "role2") + ) + + assertNotNull("The workflow was not created", createdWorkflow) + + // getUser should have access to the monitor + val getUser = "getUser" + createUserWithTestDataAndCustomRole( + getUser, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf("role2"), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) + .setSocketTimeout(60000).build() + + val getWorkflowResponse = getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + + try { + updateWorkflowWithClient(userClient!!, createdWorkflow, listOf(TEST_HR_BACKEND_ROLE, "role1")) + fail("Expected update workflow to fail as user doesn't have access to role1") + } catch (e: ResponseException) { + assertEquals("Update workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteUser(getUser) + getUserClient?.close() + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test update workflow as another user with enable filter by with removing a permission and adding permission`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient(userClient!!, monitor = monitor, listOf(TEST_HR_BACKEND_ROLE)) + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflowWithClient( + userClient!!, + workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true) + ) + + assertNotNull("The workflow was not created", createdWorkflow) + + // Remove backend role from workflow with new user and add role5 + val updateUser = "updateUser" + createUserWithRoles( + updateUser, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role5"), + false + ) + + val updateUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), updateUser, updateUser) + .setSocketTimeout(60000).build() + val updatedWorkflow = updateWorkflowWithClient(updateUserClient, createdWorkflow, listOf("role5")) + + // old user should no longer have access + try { + userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${updatedWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteUser(updateUser) + updateUserClient?.close() + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + } + + fun `test update workflow as admin with enable filter by with removing a permission`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val monitor = randomQueryLevelMonitor(enabled = true) + + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val createdMonitor = createMonitorWithClient(userClient!!, monitor = monitor, listOf(TEST_HR_BACKEND_ROLE, "role2")) + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflowWithClient( + userClient!!, + workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id)), + listOf(TEST_HR_BACKEND_ROLE, "role2") + ) + assertNotNull("The workflow was not created", createdWorkflow) + + // getUser should have access to the monitor + val getUser = "getUser" + createUserWithTestDataAndCustomRole( + getUser, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf("role1", "role2"), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + val getUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), getUser, getUser) + .setSocketTimeout(60000).build() + + val getWorkflowResponse = getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + assertEquals("Get workflow failed", RestStatus.OK, getWorkflowResponse?.restStatus()) + + // Remove backend role from monitor + val updatedWorkflow = updateWorkflowWithClient(client(), createdWorkflow, listOf("role4")) + + // original user should no longer have access + try { + userClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${updatedWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf()) + createUserRolesMapping(READALL_AND_MONITOR_ROLE, arrayOf()) + } + + // get user should no longer have access + try { + getUserClient?.makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${updatedWorkflow.id}", + null, + BasicHeader(HttpHeaders.CONTENT_TYPE, "application/json") + ) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteUser(getUser) + getUserClient?.close() + } + } + + fun `test delete workflow with disable filter by`() { + disableFilterBy() + val monitor = randomQueryLevelMonitor(enabled = true) + + val createdMonitor = createMonitor(monitor = monitor) + val createdWorkflow = createWorkflow(workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true)) + + assertNotNull("The workflow was not created", createdWorkflow) + assertTrue("The workflow was not enabled", createdWorkflow.enabled) + + deleteWorkflow(workflow = createdWorkflow, deleteDelegates = true) + + val searchMonitor = SearchSourceBuilder().query(QueryBuilders.termQuery("_id", createdMonitor.id)).toString() + // Verify if the delegate monitors are deleted + // search as "admin" - must get 0 docs + val adminMonitorSearchResponse = client().makeRequest( + "POST", + "$ALERTING_BASE_URI/_search", + emptyMap(), + NStringEntity(searchMonitor, ContentType.APPLICATION_JSON) + ) + assertEquals("Search monitor failed", RestStatus.OK, adminMonitorSearchResponse.restStatus()) + + val adminMonitorHits = createParser( + XContentType.JSON.xContent(), + adminMonitorSearchResponse.entity.content + ).map()["hits"]!! as Map> + val adminMonitorDocsFound = adminMonitorHits["total"]?.get("value") + assertEquals("Monitor found during search", 0, adminMonitorDocsFound) + + // Verify workflow deletion + try { + client().makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + emptyMap(), + null + ) + fail("Workflow found during search") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.NOT_FOUND.status, e.response.statusLine.statusCode) + } + } + + fun `test delete workflow with enable filter by`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + val createdMonitor = createMonitorWithClient( + monitor = randomQueryLevelMonitor(), + client = client(), + rbacRoles = listOf("admin") + ) + + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflow(workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true)) + assertNotNull("The workflow was not created", createdWorkflow) + assertTrue("The workflow was not enabled", createdWorkflow.enabled) + + deleteWorkflow(workflow = createdWorkflow, true) + + // Verify underlying delegates deletion + val search = SearchSourceBuilder().query(QueryBuilders.termQuery("_id", createdMonitor.id)).toString() + // search as "admin" - must get 0 docs + val adminSearchResponse = client().makeRequest( + "POST", + "$ALERTING_BASE_URI/_search", + emptyMap(), + NStringEntity(search, ContentType.APPLICATION_JSON) + ) + assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) + + val adminHits = createParser( + XContentType.JSON.xContent(), + adminSearchResponse.entity.content + ).map()["hits"]!! as Map> + val adminDocsFound = adminHits["total"]?.get("value") + assertEquals("Monitor found during search", 0, adminDocsFound) + + // Verify workflow deletion + try { + client().makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/${createdWorkflow.id}", + emptyMap(), + null + ) + fail("Workflow found during search") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.NOT_FOUND.status, e.response.statusLine.statusCode) + } + } + + fun `test delete workflow with enable filter with user that doesn't have delete_monitor cluster privilege failed`() { + enableFilterBy() + if (!isHttps()) { + // if security is disabled and filter by is enabled, we can't create monitor + // refer: `test create monitor with enable filter by` + return + } + createUserWithRoles( + user, + listOf(ALERTING_FULL_ACCESS_ROLE, READALL_AND_MONITOR_ROLE), + listOf(TEST_HR_BACKEND_ROLE, "role2"), + false + ) + + val deleteUser = "deleteUser" + createUserWithTestDataAndCustomRole( + deleteUser, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf("role1", "role3"), + listOf( + getClusterPermissionsFromCustomRole(ALERTING_DELETE_WORKFLOW_ACCESS), + getClusterPermissionsFromCustomRole(ALERTING_GET_WORKFLOW_ACCESS) + ) + ) + val deleteUserClient = SecureRestClientBuilder(clusterHosts.toTypedArray(), isHttps(), deleteUser, deleteUser) + .setSocketTimeout(60000).build() + + try { + val createdMonitor = createMonitorWithClient(userClient!!, monitor = randomQueryLevelMonitor()) + + assertNotNull("The monitor was not created", createdMonitor) + + val createdWorkflow = createWorkflowWithClient( + client = userClient!!, + workflow = randomWorkflow(monitorIds = listOf(createdMonitor.id), enabled = true) + ) + assertNotNull("The workflow was not created", createdWorkflow) + assertTrue("The workflow was not enabled", createdWorkflow.enabled) + + try { + deleteWorkflowWithClient(deleteUserClient, workflow = createdWorkflow, true) + fail("Expected Forbidden exception") + } catch (e: ResponseException) { + assertEquals("Get workflow failed", RestStatus.FORBIDDEN.status, e.response.statusLine.statusCode) + } + patchUserBackendRoles(deleteUser, arrayOf("role2")) + + val response = deleteWorkflowWithClient(deleteUserClient!!, workflow = createdWorkflow, true) + assertEquals("Delete workflow failed", RestStatus.OK, response?.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteUser(deleteUser) + deleteUserClient?.close() + } + } + + fun `test execute workflow with an user with execute workflow access`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_EXECUTE_WORKFLOW_ACCESS) + ) + + val monitor = createRandomMonitor(true) + val workflow = createRandomWorkflow(listOf(monitor.id), true) + + try { + val executeWorkflowResponse = userClient?.makeRequest( + "POST", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}/_execute", + mutableMapOf() + ) + assertEquals("Executing workflow failed", RestStatus.OK, executeWorkflowResponse?.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test execute workflow with an user without execute workflow access`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_NO_ACCESS_ROLE) + ) + + val monitor = createRandomMonitor(true) + val workflow = createRandomWorkflow(listOf(monitor.id), true) + + try { + userClient?.makeRequest( + "POST", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}/_execute", + mutableMapOf() + ) + fail("Expected 403 Method FORBIDDEN response") + } catch (e: ResponseException) { + assertEquals("Execute workflow failed", RestStatus.FORBIDDEN, e.response.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test delete workflow with an user with delete workflow access`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_DELETE_WORKFLOW_ACCESS) + ) + + val monitor = createRandomMonitor(true) + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) + val refresh = true + + try { + val deleteWorkflowResponse = userClient?.makeRequest( + "DELETE", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}?refresh=$refresh", + emptyMap(), + monitor.toHttpEntity() + ) + assertEquals("DELETE workflow failed", RestStatus.OK, deleteWorkflowResponse?.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test delete workflow with deleting delegates with an user with delete workflow access`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_DELETE_WORKFLOW_ACCESS) + ) + + val monitor = createRandomMonitor(true) + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) + + try { + val deleteWorkflowResponse = deleteWorkflowWithClient( + userClient!!, + workflow, + deleteDelegates = true, + refresh = true + ) + assertEquals("DELETE workflow failed", RestStatus.OK, deleteWorkflowResponse?.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + // Verify delegate deletion + val search = SearchSourceBuilder().query(QueryBuilders.termQuery("_id", monitor.id)).toString() + // search as "admin" - must get 0 docs + val adminSearchResponse = client().makeRequest( + "POST", + "$ALERTING_BASE_URI/_search", + emptyMap(), + NStringEntity(search, ContentType.APPLICATION_JSON) + ) + assertEquals("Search monitor failed", RestStatus.OK, adminSearchResponse.restStatus()) + + val adminHits = createParser( + XContentType.JSON.xContent(), + adminSearchResponse.entity.content + ).map()["hits"]!! as Map> + val adminDocsFound = adminHits["total"]?.get("value") + assertEquals("Monitor found during search", 0, adminDocsFound) + } + + fun `test delete workflow with an user without delete monitor access`() { + createUserWithTestDataAndCustomRole( + user, + TEST_HR_INDEX, + TEST_HR_ROLE, + listOf(TEST_HR_BACKEND_ROLE), + getClusterPermissionsFromCustomRole(ALERTING_NO_ACCESS_ROLE) + ) + + val monitor = createRandomMonitor(true) + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) + + try { + userClient?.makeRequest( + "DELETE", + "$WORKFLOW_ALERTING_BASE_URI/${workflow.id}?refresh=true", + emptyMap(), + monitor.toHttpEntity() + ) + fail("Expected 403 Method FORBIDDEN response") + } catch (e: ResponseException) { + assertEquals("DELETE workflow failed", RestStatus.FORBIDDEN, e.response.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } + + fun `test admin all access with enable filter by`() { + enableFilterBy() + createUserWithTestData(user, TEST_HR_INDEX, TEST_HR_ROLE, TEST_HR_BACKEND_ROLE) + createUserRolesMapping(ALERTING_FULL_ACCESS_ROLE, arrayOf(user)) + try { + // randomMonitor has a dummy user, api ignores the User passed as part of monitor, it picks user info from the logged-in user. + val monitor = randomQueryLevelMonitor().copy( + inputs = listOf( + SearchInput( + indices = listOf(TEST_HR_INDEX), + query = SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + ) + ) + ) + + val createResponse = userClient?.makeRequest("POST", ALERTING_BASE_URI, emptyMap(), monitor.toHttpEntity()) + assertEquals("Create monitor failed", RestStatus.CREATED, createResponse?.restStatus()) + val monitorJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + createResponse?.entity?.content + ).map() + val monitorId = monitorJson["_id"] as String + + val workflow = randomWorkflow(monitorIds = listOf(monitorId)) + val createWorkflowResponse = userClient?.makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + assertEquals("Create workflow failed", RestStatus.CREATED, createWorkflowResponse?.restStatus()) + + val workflowJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + createWorkflowResponse?.entity?.content + ).map() + + val id: String = workflowJson["_id"] as String + val search = SearchSourceBuilder().query(QueryBuilders.termQuery("_id", id)).toString() + + // get as "admin" - must get 1 docs + val adminGetResponse = client().makeRequest( + "GET", + "$WORKFLOW_ALERTING_BASE_URI/$id", + emptyMap(), + NStringEntity(search, ContentType.APPLICATION_JSON) + ) + assertEquals("Get workflow failed", RestStatus.OK, adminGetResponse.restStatus()) + + // delete as "admin" + val adminDeleteResponse = client().makeRequest( + "DELETE", + "$WORKFLOW_ALERTING_BASE_URI/$id", + emptyMap(), + NStringEntity(search, ContentType.APPLICATION_JSON) + ) + assertEquals("Delete workflow failed", RestStatus.OK, adminDeleteResponse.restStatus()) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + deleteRoleMapping(ALERTING_FULL_ACCESS_ROLE) + } + } + + fun `test execute workflow with bucket-level and doc-level chained monitors with user having partial index permissions`() { + createUser(user, arrayOf(TEST_HR_BACKEND_ROLE)) + createTestIndex(TEST_HR_INDEX) + + createIndexRoleWithDocLevelSecurity( + TEST_HR_ROLE, + TEST_HR_INDEX, + TERM_DLS_QUERY, + listOf(ALERTING_INDEX_WORKFLOW_ACCESS, ALERTING_INDEX_MONITOR_ACCESS) + ) + createUserRolesMapping(TEST_HR_ROLE, arrayOf(user)) + + // Add a doc that is accessible to the user + indexDoc( + TEST_HR_INDEX, + "1", + """ + { + "test_field": "a", + "accessible": true + } + """.trimIndent() + ) + + // Add a second doc that is not accessible to the user + indexDoc( + TEST_HR_INDEX, + "2", + """ + { + "test_field": "b", + "accessible": false + } + """.trimIndent() + ) + + indexDoc( + TEST_HR_INDEX, + "3", + """ + { + "test_field": "c", + "accessible": true + } + """.trimIndent() + ) + + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field").field("test_field") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput( + indices = listOf(TEST_HR_INDEX), + query = SearchSourceBuilder().size(0).query(QueryBuilders.matchAllQuery()).aggregation(compositeAgg) + ) + val triggerScript = """ + params.docCount > 0 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null + ), + actions = listOf() + ) + val bucketMonitor = createMonitorWithClient( + userClient!!, + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources(findingsEnabled = true) + ) + ) + assertNotNull("The bucket monitor was not created", bucketMonitor) + + val docQuery1 = DocLevelQuery(query = "test_field:\"a\"", name = "3") + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(DocLevelMonitorInput("description", listOf(TEST_HR_INDEX), listOf(docQuery1))), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)) + ) + val docMonitor = createMonitorWithClient(userClient!!, monitor1)!! + assertNotNull("The doc level monitor was not created", docMonitor) + + val workflow = randomWorkflow(monitorIds = listOf(bucketMonitor.id, docMonitor.id)) + val workflowResponse = createWorkflowWithClient(userClient!!, workflow) + assertNotNull("The workflow was not created", workflowResponse) + + try { + executeWorkflow(workflowId = workflowResponse.id) + val bucketAlerts = searchAlerts(bucketMonitor) + assertEquals("Incorrect number of alerts", 2, bucketAlerts.size) + + val docAlerts = searchAlerts(docMonitor) + assertEquals("Incorrect number of alerts", 1, docAlerts.size) + } finally { + deleteRoleAndRoleMapping(TEST_HR_ROLE) + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt new file mode 100644 index 000000000..82dd701c2 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -0,0 +1,1007 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.opensearch.alerting.ALWAYS_RUN +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.WORKFLOW_ALERTING_BASE_URI +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.randomBucketLevelMonitor +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.alerting.randomQueryLevelMonitor +import org.opensearch.alerting.randomWorkflow +import org.opensearch.alerting.randomWorkflowWithDelegates +import org.opensearch.client.ResponseException +import org.opensearch.commons.alerting.model.ChainedMonitorFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.junit.annotations.TestLogging +import java.time.Instant +import java.util.Collections +import java.util.Locale +import java.util.UUID + +@TestLogging("level:DEBUG", reason = "Debug for tests.") +@Suppress("UNCHECKED_CAST") +class WorkflowRestApiIT : AlertingRestTestCase() { + + fun `test create workflow success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + } + + fun `test create workflow with different monitor types success`() { + val index = createTestIndex() + val docQuery = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docLevelMonitorResponse = createMonitor(monitor) + + val bucketLevelMonitor = randomBucketLevelMonitor( + inputs = listOf( + SearchInput( + listOf(index), + SearchSourceBuilder().query(QueryBuilders.matchAllQuery()) + .aggregation(TermsAggregationBuilder("test_agg").field("test_field")) + ) + ) + ) + val bucketLevelMonitorResponse = createMonitor(bucketLevelMonitor) + + val workflow = randomWorkflow( + monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val workflowById = getWorkflow(createdId) + assertNotNull(workflowById) + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", docLevelMonitorResponse.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", bucketLevelMonitorResponse.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", docLevelMonitorResponse.id, delegate2.chainedMonitorFindings!!.monitorId + ) + } + + fun `test create workflow without delegate failure`() { + val workflow = randomWorkflow( + monitorIds = Collections.emptyList() + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + + fun `test create workflow duplicate delegate failure`() { + val workflow = randomWorkflow( + monitorIds = listOf("1", "1", "2") + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test create workflow delegate monitor doesn't exist failure`() { + val index = createTestIndex() + val docQuery = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docLevelMonitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf("-1", docLevelMonitorResponse.id) + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test create workflow sequence order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test create workflow chained findings monitor not in sequence failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x")) + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test create workflow chained findings order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2")) + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } + + fun `test create workflow when monitor index not initialized failure`() { + val delegates = listOf( + Delegate(1, "monitor-1") + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Monitors not found") + ) + } + } + } + + fun `test create workflow delegate and chained finding monitor different indices failure`() { + val index = randomAlphaOfLength(10).lowercase(Locale.ROOT) + createTestIndex(index) + + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val docMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docMonitorResponse = createMonitor(docMonitor) + + val index1 = "$index-1" + createTestIndex(index1) + + val docLevelInput1 = DocLevelMonitorInput( + "description", listOf(index1), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + + val docMonitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger) + ) + val docMonitorResponse1 = createMonitor(docMonitor1) + + val workflow = randomWorkflow( + monitorIds = listOf(docMonitorResponse1.id, docMonitorResponse.id) + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegate monitor and it's chained finding monitor must query the same indices") + ) + } + } + } + + fun `test create workflow query monitor chained findings monitor failure`() { + val index = createTestIndex() + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val docMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docMonitorResponse = createMonitor(docMonitor) + + val queryMonitor = randomQueryLevelMonitor() + val queryMonitorResponse = createMonitor(queryMonitor) + + val workflow = randomWorkflow( + monitorIds = listOf(queryMonitorResponse.id, docMonitorResponse.id) + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Query level monitor can't be part of chained findings") + ) + } + } + } + + fun `test create workflow with 26 delegates failure`() { + val monitorsIds = mutableListOf() + for (i in 0..25) { + monitorsIds.add(UUID.randomUUID().toString()) + } + val workflow = randomWorkflow( + monitorIds = monitorsIds + ) + try { + createWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be larger then 25.") + ) + } + } + } + + fun `test update workflow add monitor success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse2 = createMonitor(monitor2) + + val updatedWorkflow = randomWorkflow( + id = createdId, + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + + val updateResponse = client().makeRequest("PUT", updatedWorkflow.relativeUrl(), emptyMap(), updatedWorkflow.toHttpEntity()) + + assertEquals("Update workflow failed", RestStatus.OK, updateResponse.restStatus()) + + val updateResponseBody = updateResponse.asMap() + val updatedId = updateResponseBody["_id"] as String + val updatedVersion = updateResponseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, updatedId) + assertTrue("incorrect version", updatedVersion > 0) + + val workflowById = getWorkflow(updatedId) + assertNotNull(workflowById) + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse2.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse.id, delegate2.chainedMonitorFindings!!.monitorId + ) + } + + fun `test update workflow remove monitor success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse2 = createMonitor(monitor2) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + var workflowById = getWorkflow(createdId) + assertNotNull(workflowById) + // Delegate verification + @Suppress("UNCHECKED_CAST") + var delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val updatedWorkflow = randomWorkflow( + id = createdId, + monitorIds = listOf(monitorResponse.id) + ) + + val updateResponse = client().makeRequest("PUT", updatedWorkflow.relativeUrl(), emptyMap(), updatedWorkflow.toHttpEntity()) + + assertEquals("Update workflow failed", RestStatus.OK, updateResponse.restStatus()) + + val updateResponseBody = updateResponse.asMap() + val updatedId = updateResponseBody["_id"] as String + val updatedVersion = updateResponseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, updatedId) + assertTrue("incorrect version", updatedVersion > 0) + + workflowById = getWorkflow(updatedId) + assertNotNull(workflowById) + // Delegate verification + @Suppress("UNCHECKED_CAST") + delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 1, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse.id, delegate1.monitorId) + } + + fun `test update workflow change order of delegate monitors`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse1 = createMonitor(monitor1) + val monitorResponse2 = createMonitor(monitor2) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + + var workflowById = getWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + val updatedWorkflowResponse = updateWorkflow( + randomWorkflow( + id = workflowById.id, + monitorIds = listOf(monitorResponse2.id, monitorResponse1.id) + ) + ) + + assertNotNull("Workflow creation failed", updatedWorkflowResponse) + assertNotNull(updatedWorkflowResponse) + assertEquals( + "Workflow id changed", + workflowResponse.id, + updatedWorkflowResponse.id + ) + assertTrue("incorrect version", updatedWorkflowResponse.version > 0) + + workflowById = getWorkflow(updatedWorkflowResponse.id) + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals( + "Workflow name not correct", + updatedWorkflowResponse.name, + workflowById.name + ) + assertEquals( + "Workflow owner not correct", + updatedWorkflowResponse.owner, + workflowById.owner + ) + assertEquals( + "Workflow input not correct", + updatedWorkflowResponse.inputs, + workflowById.inputs + ) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse2.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse1.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse2.id, delegate2.chainedMonitorFindings!!.monitorId + ) + } + + fun `test update workflow doesn't exist failure`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)) + ) + + val monitorResponse1 = createMonitor(monitor1) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id) + ) + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + + try { + updateWorkflow(workflow.copy(id = "testId")) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow with testId is not found") + ) + } + } + val updatedWorkflow = updateWorkflow(workflowResponse.copy(enabled = true, enabledTime = Instant.now())) + assertNotNull(updatedWorkflow) + val getWorkflow = getWorkflow(workflowId = updatedWorkflow.id) + assertTrue(getWorkflow.enabled) + } + + fun `test update workflow duplicate delegate failure`() { + val index = createTestIndex() + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor) + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflow( + id = workflowResponse.id, + monitorIds = listOf("1", "1", "2") + ) + try { + updateWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test update workflow delegate monitor doesn't exist failure`() { + val index = createTestIndex() + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflow( + id = workflowResponse.id, + monitorIds = listOf("-1", monitorResponse.id) + ) + + try { + updateWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test update workflow sequence order not correct failure`() { + val index = createTestIndex() + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + workflow = randomWorkflowWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + try { + updateWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test update workflow chained findings monitor not in sequence failure`() { + val index = createTestIndex() + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x")) + ) + workflow = randomWorkflowWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + + try { + updateWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test update workflow chained findings order not correct failure`() { + val index = createTestIndex() + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor) + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = createWorkflow(workflow) + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2")) + ) + workflow = randomWorkflowWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + + try { + updateWorkflow(workflow) + } catch (e: ResponseException) { + assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } + + @Throws(Exception::class) + fun `test getting a workflow`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + val storedMonitor = getMonitor(monitor.id) + + assertEquals("Indexed and retrieved monitor differ", monitor, storedMonitor) + + val workflow = createRandomWorkflow(monitorIds = listOf(monitor.id)) + + val storedWorkflow = getWorkflow(workflow.id) + + assertEquals("Indexed and retrieved workflow differ", workflow.id, storedWorkflow.id) + val delegates = (storedWorkflow.inputs[0] as CompositeInput).sequence.delegates + assertEquals("Delegate list not correct", 1, delegates.size) + assertEquals("Delegate order id not correct", 1, delegates[0].order) + assertEquals("Delegate id list not correct", monitor.id, delegates[0].monitorId) + } + + @Throws(Exception::class) + fun `test getting a workflow that doesn't exist`() { + try { + getWorkflow(randomAlphaOfLength(20)) + fail("expected response exception") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + + fun `test delete workflow`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl()) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + + fun `test delete workflow delete delegate monitors`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl().plus("?deleteDelegateMonitors=true")) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + + // Verify that delegate monitor is deleted + try { + getMonitor(monitor.id) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Monitor not found.") + ) + } + } + } + + fun `test delete workflow preserve delegate monitors`() { + val query = randomQueryLevelMonitor() + val monitor = createMonitor(query) + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitor.id) + ) + val workflowResponse = createWorkflow(workflowRequest) + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflow(workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + client().makeRequest("DELETE", getWorkflowResponse.relativeUrl().plus("?deleteDelegateMonitors=false")) + + // Verify that the workflow is deleted + try { + getWorkflow(workflowId) + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + + // Verify that delegate monitor is not deleted + val delegateMonitor = getMonitor(monitor.id) + assertNotNull(delegateMonitor) + } + + @Throws(Exception::class) + fun `test deleting a workflow that doesn't exist`() { + try { + client().makeRequest("DELETE", "$WORKFLOW_ALERTING_BASE_URI/foobarbaz") + fail("expected 404 ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } +}