Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transport layer classes for getting and deleting the workflow #835

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
Expand Down Expand Up @@ -196,7 +198,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java)
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.apache.lucene.search.join.ScoreMode
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
Expand Down Expand Up @@ -42,6 +43,7 @@ import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.authuser.User
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand Down Expand Up @@ -107,11 +109,14 @@ class TransportDeleteMonitorAction @Inject constructor(
try {
val monitor = getMonitor()

val canDelete = user == null ||
!doFilterForUser(user) ||
val canDelete = user == null || !doFilterForUser(user) ||
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)

if (canDelete) {
if (monitorIsWorkflowDelegate(monitor.id)) {
actionListener.onFailure(
AlertingException("Monitor can't be deleted because it is a part of workflow(s)", RestStatus.FORBIDDEN, IllegalStateException())
)
} else if (canDelete) {
val deleteResponse = deleteMonitor(monitor)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
Expand All @@ -126,6 +131,43 @@ class TransportDeleteMonitorAction @Inject constructor(
}
}

/**
* Checks if the monitor is part of the workflow
*
* @param monitorId id of monitor that is checked if it is a workflow delegate
*/
private suspend fun monitorIsWorkflowDelegate(monitorId: String): Boolean {
val queryBuilder = QueryBuilders.nestedQuery(
Workflow.WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
QueryBuilders.matchQuery(
Workflow.WORKFLOW_MONITOR_PATH,
monitorId
)
),
ScoreMode.None
)
try {
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder))

client.threadPool().threadContext.stashContext().use {
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
if (searchResponse.hits.totalHits?.value == 0L) {
return false
}

val workflowIds = searchResponse.hits.hits.map { it.id }.joinToString()
log.info("Monitor $monitorId can't be deleted since it belongs to $workflowIds")
return true
}
} catch (ex: Exception) {
log.error("Error getting the monitor workflows", ex)
throw AlertingException.wrap(ex)
}
}

private suspend fun getMonitor(): Monitor {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.apache.lucene.search.join.ScoreMode
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.authuser.User
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
/**
* Transport class that deletes the workflow.
* If the deleteDelegateMonitor flag is set to true, deletes the workflow delegates that are not part of another workflow
*/
class TransportDeleteWorkflowAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
),
SecureTransportAction {

private val log = LogManager.getLogger(javaClass)

@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
listenFilterBySettingChange(clusterService)
}

override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener<DeleteWorkflowResponse>) {
val transformedRequest = request as? DeleteWorkflowRequest
?: recreateObject(request) { DeleteWorkflowRequest(it) }

val user = readUserFromThreadContext(client)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId)
.setRefreshPolicy(RefreshPolicy.IMMEDIATE)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


if (!validateUserBackendRoles(user, actionListener)) {
return
}

scope.launch {
DeleteWorkflowHandler(
client,
actionListener,
deleteRequest,
transformedRequest.deleteDelegateMonitors,
user,
transformedRequest.workflowId
).resolveUserAndStart()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need make resolveUserAndStart a suspend function? is there any advantage? i see TransportIndexMonitorAction not doing it. https://github.com/opensearch-project/alerting/blob/main/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt#L247

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteMonitors and deleteWorkflow are suspend functions - both are called by resolveUserAndStart

}
}

inner class DeleteWorkflowHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteWorkflowResponse>,
private val deleteRequest: DeleteRequest,
private val deleteDelegateMonitors: Boolean?,
private val user: User?,
private val workflowId: String
) {
suspend fun resolveUserAndStart() {
try {
val workflow = getWorkflow()

val canDelete = user == null ||
!doFilterForUser(user) ||
checkUserPermissionsWithResource(
user,
workflow.user,
actionListener,
"workflow",
workflowId
)

if (canDelete) {
val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds()

// User can only delete the delegate monitors only in the case if all monitors can be deleted
// Partial monitor deletion is not available
if (deleteDelegateMonitors == true) {
val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds, user)
val monitorsDiff = delegateMonitorIds.toMutableList()
monitorsDiff.removeAll(monitorIdsToBeDeleted)

if (monitorsDiff.isNotEmpty()) {
actionListener.onFailure(
AlertingException(
"Not allowed to delete ${monitorsDiff.joinToString()} monitors",
RestStatus.FORBIDDEN,
IllegalStateException()
)
)
return
}
}

val deleteResponse = deleteWorkflow(workflow)
if (deleteDelegateMonitors == true) {
deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE)
}
actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version))
} else {
actionListener.onFailure(
AlertingException(
"Not allowed to delete this workflow!",
RestStatus.FORBIDDEN,
IllegalStateException()
)
)
}
} catch (t: Exception) {
if (t is IndexNotFoundException) {
actionListener.onFailure(
OpenSearchStatusException(
"Workflow not found.",
RestStatus.NOT_FOUND
)
)
} else {
actionListener.onFailure(AlertingException.wrap(t))
}
}
}

private suspend fun deleteMonitors(monitorIds: List<String>, refreshPolicy: RefreshPolicy) {
if (monitorIds.isEmpty())
return

for (monitorId in monitorIds) {
val deleteRequest = DeleteMonitorRequest(monitorId, refreshPolicy)
val searchResponse: DeleteMonitorResponse = client.suspendUntil {
AlertingPluginInterface.deleteMonitor(this as NodeClient, deleteRequest, it)
}
}
}

/**
* Returns lit of monitor ids belonging only to a given workflow
* @param workflowIdToBeDeleted Id of the workflow that should be deleted
* @param monitorIds List of delegate monitor ids (underlying monitor ids)
*/
private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List<String>, user: User?): List<String> {
// Retrieve monitors belonging to another workflows
val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if the user has permissions to these monitor with the backend roles too as the monitors could get modified later and the user might not have the backend roles to access those monitors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaahh got it! Good point

QueryBuilders.nestedQuery(
Workflow.WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
QueryBuilders.termsQuery(
Workflow.WORKFLOW_MONITOR_PATH,
monitorIds
)
),
ScoreMode.None
)
)

val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder))

// Check if user can access the monitors(since the monitors could get modified later and the user might not have the backend roles to access the monitors)
if (user != null && filterByEnabled) {
addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a SecureWorkflowMonitorIT test case to test this part of code (both with & without) backend roles.


val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

val workflows = searchResponse.hits.hits.map { hit ->
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
hit.sourceRef, XContentType.JSON
).also { it.nextToken() }
lateinit var workflow: Workflow
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
xcp.nextToken()
when (xcp.currentName()) {
"workflow" -> workflow = Workflow.parse(xcp)
}
}
workflow.copy(id = hit.id, version = hit.version)
}
val workflowMonitors = workflows.filter { it.id != workflowIdToBeDeleted }.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct()
// Monitors that can be deleted -> all workflow delegates - monitors belonging to different workflows
return monitorIds.minus(workflowMonitors.toSet())
}

private suspend fun getWorkflow(): Workflow {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId)

val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
if (getResponse.isExists == false) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Workflow not found.", RestStatus.NOT_FOUND)
)
)
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow
}

private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse {
log.debug("Deleting the workflow with id ${deleteRequest.id()}")
return client.suspendUntil { delete(deleteRequest, it) }
}
// TODO - use once the workflow metadata concept is introduced
private suspend fun deleteMetadata(workflow: Workflow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add debug logs like "deleting metadata",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this part is copy pasted - even we don't have workflow metada. will comment out the call and add TODO - since we are going to introduce the metadata with workflow execution. Tnx!

val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata")
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add try catch for failure in deletion and log error and re-throw wrapped exception

}
}
}
Loading