From 1a06bb5b0589ddca30d247da31283a6b508b5673 Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Tue, 1 Nov 2022 01:12:09 +0100 Subject: [PATCH] refactored DeleteMonitor Action to be synchronious (#628) * refactored DeleteMonitor Action to be synchronious Signed-off-by: Petar Dzepina --- .../transport/TransportDeleteMonitorAction.kt | 165 +++++++----------- 1 file changed, 63 insertions(+), 102 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 755073b92..ab57a0d45 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -5,6 +5,10 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener @@ -15,6 +19,7 @@ import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client @@ -39,7 +44,9 @@ import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import java.io.IOException +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) @@ -71,7 +78,8 @@ class TransportDeleteMonitorAction @Inject constructor( if (!validateUserBackendRoles(user, actionListener)) { return } - client.threadPool().threadContext.stashContext().use { + + GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) { DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart() } } @@ -83,120 +91,73 @@ class TransportDeleteMonitorAction @Inject constructor( private val user: User?, private val monitorId: String ) { - - fun resolveUserAndStart() { - if (user == null) { - // Security is disabled, so we can delete the destination without issues - deleteMonitor() - } else if (!doFilterForUser(user)) { - // security is enabled and filterby is disabled. - deleteMonitor() - } else { - try { - start() - } catch (ex: IOException) { - actionListener.onFailure(AlertingException.wrap(ex)) + suspend fun resolveUserAndStart() { + try { + val monitor = getMonitor() + + val canDelete = user == null || + !doFilterForUser(user) || + checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) + + if (canDelete) { + val deleteResponse = deleteMonitor(monitor) + deleteMetadata(monitor) + deleteDocLevelMonitorQueries(monitor) + actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)) + } else { + actionListener.onFailure( + AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException()) + ) } + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) } } - fun start() { + private suspend fun getMonitor(): Monitor { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND) - ) - ) - return - } - val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON - ) - val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor - onGetResponse(monitor) - } - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } - } - ) - } - private fun onGetResponse(monitor: Monitor) { - if (!checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)) { - return - } else { - deleteMonitor() + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + if (getResponse.isExists == false) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND) + ) + ) } - } - - private fun deleteMonitor() { - client.delete( - deleteRequest, - object : ActionListener { - override fun onResponse(response: DeleteResponse) { - val clusterState = clusterService.state() - if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { - deleteDocLevelMonitorQueries() - } - deleteMetadata() - - actionListener.onResponse(DeleteMonitorResponse(response.id, response.version)) - } - - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } - } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON ) + return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } - private fun deleteMetadata() { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (response.isExists) { - val deleteMetadataRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "$monitorId") - .setRefreshPolicy(deleteRequest.refreshPolicy) - client.delete( - deleteMetadataRequest, - object : ActionListener { - override fun onResponse(response: DeleteResponse) { - } - - override fun onFailure(t: Exception) { - } - } - ) - } - } - override fun onFailure(t: Exception) { - } - } - ) + private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse { + return client.suspendUntil { delete(deleteRequest, it) } } - private fun deleteDocLevelMonitorQueries() { - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) { - } + private suspend fun deleteMetadata(monitor: Monitor) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + } - override fun onFailure(t: Exception) { + private suspend fun deleteDocLevelMonitorQueries(monitor: Monitor) { + val clusterState = clusterService.state() + if (!clusterState.routingTable.hasIndex(monitor.dataSources.queryIndex)) { + return + } + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(monitor.dataSources.queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) } - } - ) + ) + } } } }