Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transport layer classes for getting and deleting the workflow #835

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ class TransportDeleteMonitorAction @Inject constructor(
try {
val monitor = getMonitor()

val canDelete = monitorIsNotInWorkflows(monitor.id) && (
user == null || !doFilterForUser(user) ||
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)
)
val canDelete = user == null || !doFilterForUser(user) ||
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)

if (canDelete) {
if (monitorIsWorkflowDelegate(monitor.id)) {
actionListener.onFailure(
AlertingException("Monitor can't be deleted because it is a part of workflow(s)", RestStatus.FORBIDDEN, IllegalStateException())
)
} else if (canDelete) {
val deleteResponse = deleteMonitor(monitor)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
Expand All @@ -134,7 +136,7 @@ class TransportDeleteMonitorAction @Inject constructor(
*
* @param monitorId id of monitor that is checked if it is a workflow delegate
*/
private suspend fun monitorIsNotInWorkflows(monitorId: String): Boolean {
private suspend fun monitorIsWorkflowDelegate(monitorId: String): Boolean {
val queryBuilder = QueryBuilders.nestedQuery(
Workflow.WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
Expand All @@ -145,16 +147,25 @@ class TransportDeleteMonitorAction @Inject constructor(
),
ScoreMode.None
)
try {
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder))

val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchSource(true))
client.threadPool().threadContext.stashContext().use {
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
if (searchResponse.hits.totalHits?.value == 0L) {
return false
}

val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }
if (searchResponse.hits.totalHits?.value == 0L) {
return true
val workflowIds = searchResponse.hits.hits.map { it.id }.joinToString()
log.info("Monitor $monitorId can't be deleted since it belongs to $workflowIds")
return true
}
} catch (ex: Exception) {
log.error("Error getting the monitor workflows", ex)
throw AlertingException.wrap(ex)
}
return false
}

private suspend fun getMonitor(): Monitor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.opensearchapi.addFilter
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
Expand Down Expand Up @@ -54,8 +55,6 @@ import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java)

/**
* Transport class that deletes the workflow.
* If the deleteDelegateMonitor flag is set to true, deletes the workflow delegates that are not part of another workflow
Expand All @@ -72,6 +71,8 @@ class TransportDeleteWorkflowAction @Inject constructor(
),
SecureTransportAction {

private val log = LogManager.getLogger(javaClass)

@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
Expand Down Expand Up @@ -126,10 +127,11 @@ class TransportDeleteWorkflowAction @Inject constructor(

if (canDelete) {
val deleteResponse = deleteWorkflow(workflow)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we attempt to delete monitors first and then delete the workflow? And only delete everything if the user has permission to delete everything?

deleteMetadata(workflow)
// TODO - uncomment once the metadata are introduced
// deleteMetadata(workflow)
if (deleteDelegateMonitors == true) {
val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds()
val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds)
val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds, user)

// Delete the monitor ids
if (monitorIdsToBeDeleted.isNotEmpty()) {
Expand Down Expand Up @@ -177,7 +179,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
* @param workflowIdToBeDeleted Id of the workflow that should be deleted
* @param monitorIds List of delegate monitor ids (underlying monitor ids)
*/
private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List<String>): List<String> {
private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List<String>, user: User?): List<String> {
// Retrieve monitors belonging to another workflows
val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check if the user has permissions to these monitor with the backend roles too as the monitors could get modified later and the user might not have the backend roles to access those monitors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaahh got it! Good point

QueryBuilders.nestedQuery(
Expand All @@ -194,7 +196,12 @@ class TransportDeleteWorkflowAction @Inject constructor(

val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchSource(true))
.source(SearchSourceBuilder().query(queryBuilder))

// Check if user can access the monitors (since the monitors could get modified later and the user might not have the backend roles to access the monitors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: remove extra space ( ) between Check and if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

if (user != null && filterByEnabled) {
addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword")
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a SecureWorkflowMonitorIT test case to test this part of code (both with & without) backend roles.


val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

Expand Down Expand Up @@ -239,7 +246,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
log.debug("Deleting the workflow with id ${deleteRequest.id()}")
return client.suspendUntil { delete(deleteRequest, it) }
}

// TODO - use once the workflow metadata concept is introduced
private suspend fun deleteMetadata(workflow: Workflow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add debug logs like "deleting metadata",

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this part is copy pasted - even we don't have workflow metada. will comment out the call and add TODO - since we are going to introduce the metadata with workflow execution. Tnx!

val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata")
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add try catch for failure in deletion and log error and re-throw wrapped exception

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.get.GetRequest
Expand Down Expand Up @@ -43,6 +44,8 @@ class TransportGetWorkflowAction @Inject constructor(
),
SecureTransportAction {

private val log = LogManager.getLogger(javaClass)

@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
Expand All @@ -64,6 +67,7 @@ class TransportGetWorkflowAction @Inject constructor(
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error log

log.error("Workflow with ${getWorkflowRequest.workflowId} not found")
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
Expand Down Expand Up @@ -110,6 +114,8 @@ class TransportGetWorkflowAction @Inject constructor(
}

override fun onFailure(t: Exception) {
log.error("Getting the workflow failed", t)

if (t is IndexNotFoundException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error log

actionListener.onFailure(
OpenSearchStatusException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() {
e.message?.let {
assertTrue(
"Exception not returning DeleteMonitor Action error ",
it.contains("Not allowed to delete this monitor!")
it.contains("Monitor can't be deleted because it is a part of workflow(s)")
)
}
}
Expand Down