diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 1186ca20b..815c79c22 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -44,6 +44,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction +import org.opensearch.alerting.transport.TransportDeleteWorkflowAction import org.opensearch.alerting.transport.TransportExecuteMonitorAction import org.opensearch.alerting.transport.TransportGetAlertsAction import org.opensearch.alerting.transport.TransportGetDestinationsAction @@ -51,6 +52,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction +import org.opensearch.alerting.transport.TransportGetWorkflowAction import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportIndexWorkflowAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction @@ -194,7 +196,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java), - ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java) + ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java) ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index e5f71133d..277691e66 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest @@ -42,6 +43,7 @@ import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject import org.opensearch.core.xcontent.NamedXContentRegistry @@ -106,9 +108,12 @@ class TransportDeleteMonitorAction @Inject constructor( suspend fun resolveUserAndStart() { try { val monitor = getMonitor() - - val canDelete = user == null || - !doFilterForUser(user) || + if (monitorIsWorkflowDelegate(monitor.id)) { + actionListener.onFailure( + AlertingException("Monitor can't be deleted because it is a part of workflow(s)", RestStatus.FORBIDDEN, IllegalStateException()) + ) + } + val canDelete = user == null || !doFilterForUser(user) || checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) if (canDelete) { @@ -125,6 +130,43 @@ class TransportDeleteMonitorAction @Inject constructor( } } + /** + * Checks if the monitor is part of the workflow + * + * @param monitorId id of monitor that is checked if it is a workflow delegate + */ + private suspend fun monitorIsWorkflowDelegate(monitorId: String): Boolean { + val queryBuilder = QueryBuilders.nestedQuery( + Workflow.WORKFLOW_DELEGATE_PATH, + QueryBuilders.boolQuery().must( + QueryBuilders.matchQuery( + Workflow.WORKFLOW_MONITOR_PATH, + monitorId + ) + ), + ScoreMode.None + ) + try { + val searchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source(SearchSourceBuilder().query(queryBuilder)) + + client.threadPool().threadContext.stashContext().use { + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + if (searchResponse.hits.totalHits?.value == 0L) { + return false + } + + val workflowIds = searchResponse.hits.hits.map { it.id }.joinToString() + log.info("Monitor $monitorId can't be deleted since it belongs to $workflowIds") + return true + } + } catch (ex: Exception) { + log.error("Error getting the monitor workflows", ex) + throw AlertingException.wrap(ex) + } + } + private suspend fun getMonitor(): Monitor { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt new file mode 100644 index 000000000..ccedfb17b --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -0,0 +1,268 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.alerting.opensearchapi.addFilter +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.AlertingPluginInterface +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest +import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) +/** + * Transport class that deletes the workflow. + * If the deleteDelegateMonitor flag is set to true, deletes the workflow delegates that are not part of another workflow + */ +class TransportDeleteWorkflowAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val clusterService: ClusterService, + settings: Settings, + val xContentRegistry: NamedXContentRegistry +) : HandledTransportAction( + AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest +), + SecureTransportAction { + + private val log = LogManager.getLogger(javaClass) + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener) { + val transformedRequest = request as? DeleteWorkflowRequest + ?: recreateObject(request) { DeleteWorkflowRequest(it) } + + val user = readUserFromThreadContext(client) + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + + scope.launch { + DeleteWorkflowHandler( + client, + actionListener, + deleteRequest, + transformedRequest.deleteDelegateMonitors, + user, + transformedRequest.workflowId + ).resolveUserAndStart() + } + } + + inner class DeleteWorkflowHandler( + private val client: Client, + private val actionListener: ActionListener, + private val deleteRequest: DeleteRequest, + private val deleteDelegateMonitors: Boolean?, + private val user: User?, + private val workflowId: String + ) { + suspend fun resolveUserAndStart() { + try { + val workflow = getWorkflow() + + val canDelete = user == null || + !doFilterForUser(user) || + checkUserPermissionsWithResource( + user, + workflow.user, + actionListener, + "workflow", + workflowId + ) + + if (canDelete) { + val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds() + + // User can only delete the delegate monitors only in the case if all monitors can be deleted + // Partial monitor deletion is not available + if (deleteDelegateMonitors == true) { + val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds, user) + val monitorsDiff = delegateMonitorIds.toMutableList() + monitorsDiff.removeAll(monitorIdsToBeDeleted) + + if (monitorsDiff.isNotEmpty()) { + actionListener.onFailure( + AlertingException( + "Not allowed to delete ${monitorsDiff.joinToString()} monitors", + RestStatus.FORBIDDEN, + IllegalStateException() + ) + ) + return + } + } + + val deleteResponse = deleteWorkflow(workflow) + if (deleteDelegateMonitors == true) { + deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + } + actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version)) + } else { + actionListener.onFailure( + AlertingException( + "Not allowed to delete this workflow!", + RestStatus.FORBIDDEN, + IllegalStateException() + ) + ) + } + } catch (t: Exception) { + if (t is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + } + + private suspend fun deleteMonitors(monitorIds: List, refreshPolicy: RefreshPolicy) { + if (monitorIds.isEmpty()) + return + + for (monitorId in monitorIds) { + val deleteRequest = DeleteMonitorRequest(monitorId, refreshPolicy) + val searchResponse: DeleteMonitorResponse = client.suspendUntil { + AlertingPluginInterface.deleteMonitor(this as NodeClient, deleteRequest, it) + } + } + } + + /** + * Returns lit of monitor ids belonging only to a given workflow + * @param workflowIdToBeDeleted Id of the workflow that should be deleted + * @param monitorIds List of delegate monitor ids (underlying monitor ids) + */ + private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List, user: User?): List { + // Retrieve monitors belonging to another workflows + val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter( + QueryBuilders.nestedQuery( + Workflow.WORKFLOW_DELEGATE_PATH, + QueryBuilders.boolQuery().must( + QueryBuilders.termsQuery( + Workflow.WORKFLOW_MONITOR_PATH, + monitorIds + ) + ), + ScoreMode.None + ) + ) + + val searchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source(SearchSourceBuilder().query(queryBuilder)) + + // Check if user can access the monitors(since the monitors could get modified later and the user might not have the backend roles to access the monitors) + if (user != null && filterByEnabled) { + addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") + } + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + + val workflows = searchResponse.hits.hits.map { hit -> + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, XContentType.JSON + ).also { it.nextToken() } + lateinit var workflow: Workflow + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "workflow" -> workflow = Workflow.parse(xcp) + } + } + workflow.copy(id = hit.id, version = hit.version) + } + val workflowMonitors = workflows.filter { it.id != workflowIdToBeDeleted }.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct() + // Monitors that can be deleted -> all workflow delegates - monitors belonging to different workflows + return monitorIds.minus(workflowMonitors.toSet()) + } + + private suspend fun getWorkflow(): Workflow { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId) + + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + if (getResponse.isExists == false) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Workflow not found.", RestStatus.NOT_FOUND) + ) + ) + } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ) + return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow + } + + private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse { + log.debug("Deleting the workflow with id ${deleteRequest.id()}") + return client.suspendUntil { delete(deleteRequest, it) } + } + // TODO - use once the workflow metadata concept is introduced + private suspend fun deleteMetadata(workflow: Workflow) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata") + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt new file mode 100644 index 000000000..e09f1a91a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.index.IndexNotFoundException +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +class TransportGetWorkflowAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + val clusterService: ClusterService, + settings: Settings +) : HandledTransportAction( + AlertingActions.GET_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::GetWorkflowRequest +), + SecureTransportAction { + + private val log = LogManager.getLogger(javaClass) + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, getWorkflowRequest: GetWorkflowRequest, actionListener: ActionListener) { + val user = readUserFromThreadContext(client) + + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getWorkflowRequest.workflowId) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + + client.threadPool().threadContext.stashContext().use { + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + log.error("Workflow with ${getWorkflowRequest.workflowId} not found") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + ) + return + } + + var workflow: Workflow? = null + if (!response.isSourceEmpty) { + XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON + ).use { xcp -> + workflow = ScheduledJob.parse(xcp, response.id, response.version) as Workflow + + // security is enabled and filterby is enabled + if (!checkUserPermissionsWithResource( + user, + workflow?.user, + actionListener, + "workflow", + getWorkflowRequest.workflowId + ) + ) { + return + } + } + } + + actionListener.onResponse( + GetWorkflowResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + workflow + ) + ) + } + + override fun onFailure(t: Exception) { + log.error("Getting the workflow failed", t) + + if (t is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + } + ) + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index 7363a5e7b..c1e98450e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -397,6 +397,257 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test get workflow`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val workflowResponse = upsertWorkflow(workflowRequest)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + assertNotNull(getWorkflowResponse) + + val workflowById = getWorkflowResponse.workflow!! + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, getWorkflowResponse.id) + assertTrue("incorrect version", getWorkflowResponse.version > 0) + assertEquals("Workflow name not correct", workflowRequest.name, workflowById.name) + assertEquals("Workflow owner not correct", workflowRequest.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflowRequest.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 1, delegates.size) + + val delegate = delegates[0] + assertNotNull(delegate) + assertEquals("Delegate order not correct", 1, delegate.order) + assertEquals("Delegate id not correct", monitorResponse.id, delegate.monitorId) + } + + fun `test get workflow for invalid id monitor index doesn't exist`() { + // Get workflow for non existing workflow id + try { + getWorkflowById(id = "-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found") + ) + } + } + } + + fun `test get workflow for invalid id monitor index exists`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + createMonitor(monitor) + // Get workflow for non existing workflow id + try { + getWorkflowById(id = "-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found") + ) + } + } + } + + fun `test delete workflow delegate monitor deleted`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + deleteWorkflow(workflowId, true) + // Verify that the workflow is deleted + try { + getWorkflowById(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + // Verify that the monitor is deleted + try { + getMonitorResponse(monitorResponse.id) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetMonitor Action error ", + it.contains("Monitor not found") + ) + } + } + } + + fun `test delete workflow delegate monitor not deleted`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + val workflowRequest2 = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse2 = upsertWorkflow(workflowRequest2)!! + val workflowId2 = workflowResponse2.id + val getWorkflowResponse2 = getWorkflowById(id = workflowResponse2.id) + + assertNotNull(getWorkflowResponse2) + assertEquals(workflowId2, getWorkflowResponse2.id) + + try { + deleteWorkflow(workflowId, true) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("[Not allowed to delete ${monitorResponse.id} monitors") + ) + } + } + val existingMonitor = getMonitorResponse(monitorResponse.id) + assertNotNull(existingMonitor) + } + + fun `test trying to delete monitor that is part of workflow sequence`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + // Verify that the monitor can't be deleted because it's included in the workflow + try { + deleteMonitor(monitorResponse.id) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning DeleteMonitor Action error ", + it.contains("Monitor can't be deleted because it is a part of workflow(s)") + ) + } + } + } + + fun `test delete workflow for invalid id monitor index doesn't exists`() { + // Try deleting non-existing workflow + try { + deleteWorkflow("-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning DeleteWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + + fun `test delete workflow for invalid id monitor index exists`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + createMonitor(monitor) + // Try deleting non-existing workflow + try { + deleteWorkflow("-1") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning DeleteWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + } + fun `test create workflow without delegate failure`() { val workflow = randomWorkflow( monitorIds = Collections.emptyList() @@ -664,7 +915,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val queryMonitor = randomQueryLevelMonitor() val queryMonitorResponse = createMonitor(queryMonitor)!! - var workflow = randomWorkflow( + val workflow = randomWorkflow( monitorIds = listOf(queryMonitorResponse.id, docMonitorResponse.id) ) try { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index ccd1a3c1d..749b71555 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -9,6 +9,9 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope import org.opensearch.action.support.WriteRequest import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.model.ScheduledJob @@ -72,4 +75,17 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { return client().execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, request).actionGet() } + protected fun getWorkflowById(id: String): GetWorkflowResponse { + return client().execute( + AlertingActions.GET_WORKFLOW_ACTION_TYPE, + GetWorkflowRequest(id, RestRequest.Method.GET) + ).get() + } + + protected fun deleteWorkflow(workflowId: String, deleteDelegateMonitors: Boolean? = null) { + client().execute( + AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, + DeleteWorkflowRequest(workflowId, deleteDelegateMonitors) + ).get() + } }