diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt new file mode 100644 index 000000000..0985750ff --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -0,0 +1,287 @@ + +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.addFilter +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.opensearchapi.withClosableContext +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.AlertingPluginInterface +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest +import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.action.DeleteWorkflowRequest +import org.opensearch.commons.alerting.action.DeleteWorkflowResponse +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.util.UUID + +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) +/** + * Transport class that deletes the workflow. + * If the deleteDelegateMonitor flag is set to true, deletes the workflow delegates that are not part of another workflow + */ +class TransportDeleteWorkflowAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val clusterService: ClusterService, + val settings: Settings, + val xContentRegistry: NamedXContentRegistry +) : HandledTransportAction( + AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest +), + SecureTransportAction { + + private val log = LogManager.getLogger(javaClass) + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener) { + val transformedRequest = request as? DeleteWorkflowRequest + ?: recreateObject(request) { DeleteWorkflowRequest(it) } + + val user = readUserFromThreadContext(client) + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId) + .setRefreshPolicy(RefreshPolicy.IMMEDIATE) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + + scope.launch { + DeleteWorkflowHandler( + client, + actionListener, + deleteRequest, + transformedRequest.deleteDelegateMonitors, + user, + transformedRequest.workflowId + ).resolveUserAndStart() + } + } + + inner class DeleteWorkflowHandler( + private val client: Client, + private val actionListener: ActionListener, + private val deleteRequest: DeleteRequest, + private val deleteDelegateMonitors: Boolean?, + private val user: User?, + private val workflowId: String + ) { + suspend fun resolveUserAndStart() { + try { + val workflow = getWorkflow() + + val canDelete = user == null || + !doFilterForUser(user) || + checkUserPermissionsWithResource( + user, + workflow.user, + actionListener, + "workflow", + workflowId + ) + + if (canDelete) { + val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds() + + // User can only delete the delegate monitors only in the case if all monitors can be deleted + // Partial monitor deletion is not available + if (deleteDelegateMonitors == true) { + val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds, user) + val monitorsDiff = delegateMonitorIds.toMutableList() + monitorsDiff.removeAll(monitorIdsToBeDeleted) + + if (monitorsDiff.isNotEmpty()) { + actionListener.onFailure( + AlertingException( + "Not allowed to delete ${monitorsDiff.joinToString()} monitors", + RestStatus.FORBIDDEN, + IllegalStateException() + ) + ) + return + } + } + + val deleteResponse = deleteWorkflow(workflow) + if (deleteDelegateMonitors == true) { + if (user == null) { + deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + } else { + // Un-stash the context + withClosableContext( + InjectorContextElement( + user.name.plus(UUID.randomUUID().toString()), + settings, + client.threadPool().threadContext, + user.roles, + user + ) + ) { + deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + } + } + } + actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version)) + } else { + actionListener.onFailure( + AlertingException( + "Not allowed to delete this workflow!", + RestStatus.FORBIDDEN, + IllegalStateException() + ) + ) + } + } catch (t: Exception) { + if (t is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + } + + private suspend fun deleteMonitors(monitorIds: List, refreshPolicy: RefreshPolicy) { + if (monitorIds.isEmpty()) + return + + for (monitorId in monitorIds) { + val deleteRequest = DeleteMonitorRequest(monitorId, refreshPolicy) + val searchResponse: DeleteMonitorResponse = client.suspendUntil { + AlertingPluginInterface.deleteMonitor(this as NodeClient, deleteRequest, it) + } + } + } + + /** + * Returns lit of monitor ids belonging only to a given workflow + * @param workflowIdToBeDeleted Id of the workflow that should be deleted + * @param monitorIds List of delegate monitor ids (underlying monitor ids) + */ + private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List, user: User?): List { + // Retrieve monitors belonging to another workflows + val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter( + QueryBuilders.nestedQuery( + Workflow.WORKFLOW_DELEGATE_PATH, + QueryBuilders.boolQuery().must( + QueryBuilders.termsQuery( + Workflow.WORKFLOW_MONITOR_PATH, + monitorIds + ) + ), + ScoreMode.None + ) + ) + + val searchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source(SearchSourceBuilder().query(queryBuilder)) + + // Check if user can access the monitors(since the monitors could get modified later and the user might not have the backend roles to access the monitors) + if (user != null && filterByEnabled) { + addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") + } + + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + + val workflows = searchResponse.hits.hits.map { hit -> + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, XContentType.JSON + ).also { it.nextToken() } + lateinit var workflow: Workflow + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "workflow" -> workflow = Workflow.parse(xcp) + } + } + workflow.copy(id = hit.id, version = hit.version) + } + val workflowMonitors = workflows.filter { it.id != workflowIdToBeDeleted }.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct() + // Monitors that can be deleted -> all workflow delegates - monitors belonging to different workflows + return monitorIds.minus(workflowMonitors.toSet()) + } + + private suspend fun getWorkflow(): Workflow { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId) + + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + if (getResponse.isExists == false) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Workflow not found.", RestStatus.NOT_FOUND) + ) + ) + } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ) + return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow + } + + private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse { + log.debug("Deleting the workflow with id ${deleteRequest.id()}") + return client.suspendUntil { delete(deleteRequest, it) } + } + // TODO - use once the workflow metadata concept is introduced + private suspend fun deleteMetadata(workflow: Workflow) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata") + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt new file mode 100644 index 000000000..68df71e4a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetWorkflowRequest +import org.opensearch.commons.alerting.action.GetWorkflowResponse +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.index.IndexNotFoundException +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService + +class TransportGetWorkflowAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + val clusterService: ClusterService, + settings: Settings +) : HandledTransportAction( + AlertingActions.GET_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::GetWorkflowRequest +), + SecureTransportAction { + + private val log = LogManager.getLogger(javaClass) + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, getWorkflowRequest: GetWorkflowRequest, actionListener: ActionListener) { + val user = readUserFromThreadContext(client) + + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getWorkflowRequest.workflowId) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + + client.threadPool().threadContext.stashContext().use { + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + log.error("Workflow with ${getWorkflowRequest.workflowId} not found") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + ) + return + } + + var workflow: Workflow? = null + if (!response.isSourceEmpty) { + XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON + ).use { xcp -> + val compositeMonitor = ScheduledJob.parse(xcp, response.id, response.version) + if (compositeMonitor is Workflow) { + workflow = compositeMonitor + } else { + log.error("Wrong monitor type returned") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + ) + return + } + + // security is enabled and filterby is enabled + if (!checkUserPermissionsWithResource( + user, + workflow?.user, + actionListener, + "workflow", + getWorkflowRequest.workflowId + ) + ) { + return + } + } + } + + actionListener.onResponse( + GetWorkflowResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + workflow + ) + ) + } + + override fun onFailure(t: Exception) { + log.error("Getting the workflow failed", t) + + if (t is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Workflow not found", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + } + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index bc8f59b15..5fdd4490a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.alerting.transport import kotlinx.coroutines.CoroutineScope @@ -24,8 +29,10 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT @@ -34,6 +41,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEO import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils +import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.util.isQueryLevelMonitor import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService @@ -66,6 +74,7 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService +import java.util.UUID import java.util.stream.Collectors private val log = LogManager.getLogger(TransportIndexWorkflowAction::class.java) @@ -131,8 +140,7 @@ class TransportIndexWorkflowAction @Inject constructor( ) { if (transformedRequest.rbacRoles?.stream()?.anyMatch { !user.backendRoles.contains(it) } == true) { log.error( - "User specified backend roles, ${transformedRequest.rbacRoles}, " + - "that they don' have access to. User backend roles: ${user.backendRoles}" + "User specified backend roles, ${transformedRequest.rbacRoles}, that they don' have access to. User backend roles: ${user.backendRoles}" ) actionListener.onFailure( AlertingException.wrap( @@ -145,8 +153,7 @@ class TransportIndexWorkflowAction @Inject constructor( return } else if (transformedRequest.rbacRoles?.isEmpty() == true) { log.error( - "Non-admin user are not allowed to specify an empty set of backend roles. " + - "Please don't pass in the parameter or pass in at least one backend role." + "Non-admin user are not allowed to specify an empty set of backend roles. Please don't pass in the parameter or pass in at least one backend role." ) actionListener.onFailure( AlertingException.wrap( @@ -159,14 +166,27 @@ class TransportIndexWorkflowAction @Inject constructor( return } } + scope.launch { try { - validateRequest(client, transformedRequest, user) + validateMonitorAccess( + transformedRequest, + user, + client, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + // Stash the context and start the workflow creation + client.threadPool().threadContext.stashContext().use { + IndexWorkflowHandler(client, actionListener, transformedRequest, user).resolveUserAndStart() + } + } - // If the validation was successful - continue with the execution - client.threadPool().threadContext.stashContext().use { - IndexWorkflowHandler(client, actionListener, transformedRequest, user).resolveUserAndStart() - } + override fun onFailure(e: Exception) { + log.error("Error indexing workflow", e) + actionListener.onFailure(e) + } + } + ) } catch (e: Exception) { log.error("Failed to create workflow", e) if (e is IndexNotFoundException) { @@ -183,45 +203,11 @@ class TransportIndexWorkflowAction @Inject constructor( } } - /** - * Validates the request in several steps - * Checks if the user has appropriate backend roles and if he can access the given monitors and it's indices - */ - private suspend fun validateRequest(client: Client, request: IndexWorkflowRequest, user: User?) { - if (request.workflow.inputs.isEmpty()) - throw AlertingException.wrap(IllegalArgumentException("Input list can not be empty.")) - - if (request.workflow.inputs.size > 1) - throw AlertingException.wrap(IllegalArgumentException("Input list can contain only one element.")) - - if (request.workflow.inputs[0] !is CompositeInput) - throw AlertingException.wrap(IllegalArgumentException("When creating a workflow input must be CompositeInput")) - - val compositeInput = request.workflow.inputs[0] as CompositeInput - val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) - - if (monitorIds.isNullOrEmpty()) - throw AlertingException.wrap(IllegalArgumentException("Delegates list can not be empty.")) - - validateDuplicateDelegateMonitorReferenceExists(monitorIds) - validateSequenceOrdering(compositeInput.sequence.delegates) - validateChainedMonitorFindings(compositeInput.sequence.delegates) - - val monitors = getDelegateMonitors(user, monitorIds) - - validateDelegateMonitorsExist(monitorIds, monitors) - validateChainedMonitorFindingsMonitors(compositeInput.sequence.delegates, monitors) - - val indices = getMonitorIndices(monitors) - - validateIndicesAccess(indices, client) - } - inner class IndexWorkflowHandler( private val client: Client, private val actionListener: ActionListener, private val request: IndexWorkflowRequest, - private val user: User? + private val user: User?, ) { fun resolveUserAndStart() { scope.launch { @@ -318,7 +304,8 @@ class TransportIndexWorkflowAction @Inject constructor( actionListener.onFailure( AlertingException.wrap( OpenSearchStatusException( - "Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged", RestStatus.INTERNAL_SERVER_ERROR + "Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged", + RestStatus.INTERNAL_SERVER_ERROR ) ) ) @@ -357,7 +344,12 @@ class TransportIndexWorkflowAction @Inject constructor( val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) - .source(request.workflow.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .source( + request.workflow.toXContentWithUser( + jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) .setIfSeqNo(request.seqNo) .setIfPrimaryTerm(request.primaryTerm) .timeout(indexTimeout) @@ -368,7 +360,12 @@ class TransportIndexWorkflowAction @Inject constructor( if (failureReasons != null) { log.error("Failed to create workflow: $failureReasons") actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + AlertingException.wrap( + OpenSearchStatusException( + failureReasons.toString(), + indexResponse.status() + ) + ) ) return } @@ -391,7 +388,10 @@ class TransportIndexWorkflowAction @Inject constructor( if (!getResponse.isExists) { actionListener.onFailure( AlertingException.wrap( - OpenSearchStatusException("Workflow with ${request.workflowId} is not found", RestStatus.NOT_FOUND) + OpenSearchStatusException( + "Workflow with ${request.workflowId} is not found", + RestStatus.NOT_FOUND + ) ) ) return @@ -408,7 +408,14 @@ class TransportIndexWorkflowAction @Inject constructor( } private suspend fun onGetResponse(currentWorkflow: Workflow) { - if (!checkUserPermissionsWithResource(user, currentWorkflow.user, actionListener, "workfklow", request.workflowId)) { + if (!checkUserPermissionsWithResource( + user, + currentWorkflow.user, + actionListener, + "workfklow", + request.workflowId + ) + ) { return } @@ -441,14 +448,22 @@ class TransportIndexWorkflowAction @Inject constructor( // rolesToRemove: these are the backend roles to remove from the monitor val rolesToRemove = user.backendRoles - request.rbacRoles.orEmpty() // remove the monitor's roles with rolesToRemove and add any roles passed into the request.rbacRoles - val updatedRbac = currentWorkflow.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty() + val updatedRbac = + currentWorkflow.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty() request.workflow = request.workflow.copy( user = User(user.name, updatedRbac, user.roles, user.customAttNames) ) } } else { request.workflow = request.workflow - .copy(user = User(user.name, currentWorkflow.user!!.backendRoles, user.roles, user.customAttNames)) + .copy( + user = User( + user.name, + currentWorkflow.user!!.backendRoles, + user.roles, + user.customAttNames + ) + ) } log.debug("Update workflow backend roles to: ${request.workflow.user?.backendRoles}") } @@ -456,7 +471,12 @@ class TransportIndexWorkflowAction @Inject constructor( request.workflow = request.workflow.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) - .source(request.workflow.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .source( + request.workflow.toXContentWithUser( + jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) .id(request.workflowId) .setIfSeqNo(request.seqNo) .setIfPrimaryTerm(request.primaryTerm) @@ -467,7 +487,12 @@ class TransportIndexWorkflowAction @Inject constructor( val failureReasons = checkShardsFailure(indexResponse) if (failureReasons != null) { actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + AlertingException.wrap( + OpenSearchStatusException( + failureReasons.toString(), + indexResponse.status() + ) + ) ) return } @@ -494,55 +519,58 @@ class TransportIndexWorkflowAction @Inject constructor( } } - private fun validateChainedMonitorFindings(delegates: List) { - val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } - delegates.forEach { - if (it.chainedMonitorFindings != null) { - if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) { - throw AlertingException.wrap( - IllegalArgumentException( - "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence" - ) - ) - } - if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) { - throw AlertingException.wrap( - IllegalArgumentException( - "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}" - ) - ) - } - } - } - } - private fun validateChainedMonitorFindingsMonitors(delegates: List, monitorDelegates: List) { + infix fun List.equalsIgnoreOrder(other: List) = + this.size == other.size && this.toSet() == other.toSet() + val monitorsById = monitorDelegates.associateBy { it.id } delegates.forEach { + val delegateMonitor = monitorsById[it.monitorId] ?: throw AlertingException.wrap( + IllegalArgumentException("Delegate monitor ${it.monitorId} doesn't exist") + ) if (it.chainedMonitorFindings != null) { - val chainedFindingMonitor = monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap( - IllegalArgumentException("Chained finding monitor doesn't exist") - ) + val chainedFindingMonitor = + monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap( + IllegalArgumentException("Chained finding monitor doesn't exist") + ) if (chainedFindingMonitor.isQueryLevelMonitor()) { throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings")) } + + val delegateMonitorIndices = getMonitorIndices(delegateMonitor) + + val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor) + + if (!delegateMonitorIndices.equalsIgnoreOrder(chainedMonitorIndices)) { + throw AlertingException.wrap(IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices")) + } } } } - private fun validateSequenceOrdering(delegates: List) { - val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet()) - if (orderSet.size != delegates.size) { - throw AlertingException.wrap(IllegalArgumentException("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) - } - } + /** + * Returns list of indices for the given monitor depending on it's type + */ + private fun getMonitorIndices(monitor: Monitor): List { + return when (monitor.monitorType) { + Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices + Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices } + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { + if (isADMonitor(monitor)) monitor.inputs.flatMap { s -> (s as SearchInput).indices } + else { + val indices = mutableListOf() + for (input in monitor.inputs) { + when (input) { + is SearchInput -> indices.addAll(input.indices) + else -> indices + } + } + indices + } + } - private fun validateDuplicateDelegateMonitorReferenceExists( - monitorIds: MutableList - ) { - if (monitorIds.toSet().size != monitorIds.size) { - throw AlertingException.wrap(IllegalArgumentException("Duplicate delegates not allowed")) + else -> emptyList() } } @@ -560,30 +588,28 @@ class TransportIndexWorkflowAction @Inject constructor( } /** - * Returns monitors for the given ids if user has an access + * Validates monitor and indices access + * 1. Validates the monitor access (if the filterByEnabled is set to true - adds backend role filter) as admin + * 2. Unstashes the context and checks if the user can access the monitor indices */ - private suspend fun getDelegateMonitors( + private suspend fun validateMonitorAccess( + request: IndexWorkflowRequest, user: User?, - monitorIds: MutableList - ): List { + client: Client, + actionListener: ActionListener + ) { + val compositeInput = request.workflow.inputs[0] as CompositeInput + val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) val query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("_id", monitorIds)) val searchSource = SearchSourceBuilder().query(query) - val monitorSearchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) - // TODO - Add secure tests once the Rest Action is created - if (user != null && filterByEnabled) { - addFilter(user, monitorSearchRequest.source(), "monitor.user.backend_roles.keyword") + val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) + + if (user != null && !isAdmin(user) && filterByEnabled) { + addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") } - val searchMonitorResponse: SearchResponse = client.suspendUntil { client.search(monitorSearchRequest, it) } + val searchMonitorResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } - if (searchMonitorResponse.status() != RestStatus.OK) { - throw AlertingException.wrap( - OpenSearchStatusException( - "User doesn't have read permissions for one or more configured monitors ${monitorIds.joinToString()}", - RestStatus.FORBIDDEN - ) - ) - } if (searchMonitorResponse.isTimedOut) { throw OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy") } @@ -597,7 +623,77 @@ class TransportIndexWorkflowAction @Inject constructor( monitors.add(monitor) } } - return monitors + if (monitors.isEmpty()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "User doesn't have read permissions for one or more configured monitors ${monitorIds.joinToString()}", + RestStatus.FORBIDDEN + ) + ) + ) + return + } + // Validate delegates and it's chained findings + try { + validateDelegateMonitorsExist(monitorIds, monitors) + validateChainedMonitorFindingsMonitors(compositeInput.sequence.delegates, monitors) + } catch (e: Exception) { + actionListener.onFailure(e) + return + } + val indices = getMonitorIndices(monitors) + + val indicesSearchRequest = SearchRequest().indices(*indices.toTypedArray()) + .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) + + if (user == null) { + checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) + } else { + // Unstash the context and check if user with specified roles has indices access + withClosableContext( + InjectorContextElement( + user.name.plus(UUID.randomUUID().toString()), + settings, + client.threadPool().threadContext, + user.roles, + user + ) + ) { + checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) + } + } + } + + /** + * Checks if the client can access the given indices + */ + private fun checkIndicesAccess( + client: Client, + indicesSearchRequest: SearchRequest?, + indices: MutableList, + actionListener: ActionListener, + ) { + client.search( + indicesSearchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse?) { + actionListener.onResponse(AcknowledgedResponse(true)) + } + + override fun onFailure(e: Exception) { + log.error("Error accessing the monitor indices", e) + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "User doesn't have read permissions for one or more configured index ${indices.joinToString()}", + RestStatus.FORBIDDEN + ) + ) + ) + } + } + ) } /** @@ -615,25 +711,4 @@ class TransportIndexWorkflowAction @Inject constructor( } return indices } - - /** - * Checks if the user can access the monitor indices - */ - private suspend fun validateIndicesAccess( - indices: MutableList, - client: Client, - ) { - val indicesSearchRequest = SearchRequest().indices(*indices.toTypedArray()) - .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) - - val indicesSearchResponse: SearchResponse = client.suspendUntil { client.search(indicesSearchRequest, it) } - if (indicesSearchResponse.status() != RestStatus.OK) { - throw AlertingException.wrap( - OpenSearchStatusException( - "User doesn't have read permissions for one or more configured index ${indices.joinToString()}", - RestStatus.FORBIDDEN - ) - ) - } - } }