diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index b5ba20b02..74c23c337 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -5,6 +5,10 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener @@ -18,6 +22,7 @@ import org.opensearch.alerting.action.DeleteMonitorAction import org.opensearch.alerting.action.DeleteMonitorRequest import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client @@ -36,7 +41,9 @@ import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import java.io.IOException +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) @@ -66,7 +73,8 @@ class TransportDeleteMonitorAction @Inject constructor( if (!validateUserBackendRoles(user, actionListener)) { return } - client.threadPool().threadContext.stashContext().use { + + GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) { DeleteMonitorHandler(client, actionListener, deleteRequest, user, request.monitorId).resolveUserAndStart() } } @@ -78,120 +86,73 @@ class TransportDeleteMonitorAction @Inject constructor( private val user: User?, private val monitorId: String ) { - - fun resolveUserAndStart() { - if (user == null) { - // Security is disabled, so we can delete the destination without issues - deleteMonitor() - } else if (!doFilterForUser(user)) { - // security is enabled and filterby is disabled. - deleteMonitor() - } else { - try { - start() - } catch (ex: IOException) { - actionListener.onFailure(AlertingException.wrap(ex)) + suspend fun resolveUserAndStart() { + try { + val monitor = getMonitor() + + val canDelete = user == null || + !doFilterForUser(user) || + checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) + + if (canDelete) { + val deleteResponse = deleteMonitor(monitor) + deleteMetadata(monitor) + deleteDocLevelMonitorQueries(monitor) + actionListener.onResponse(deleteResponse) + } else { + actionListener.onFailure( + AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException()) + ) } + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) } } - fun start() { + private suspend fun getMonitor(): Monitor { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND) - ) - ) - return - } - val xcp = XContentHelper.createParser( - xContentRegistry, LoggingDeprecationHandler.INSTANCE, - response.sourceAsBytesRef, XContentType.JSON - ) - val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor - onGetResponse(monitor) - } - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } - } - ) - } - private fun onGetResponse(monitor: Monitor) { - if (!checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)) { - return - } else { - deleteMonitor() + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + if (getResponse.isExists == false) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND) + ) + ) } - } - - private fun deleteMonitor() { - client.delete( - deleteRequest, - object : ActionListener { - override fun onResponse(response: DeleteResponse) { - val clusterState = clusterService.state() - if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { - deleteDocLevelMonitorQueries() - } - deleteMetadata() - - actionListener.onResponse(response) - } - - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } - } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON ) + return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } - private fun deleteMetadata() { - val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (response.isExists) { - val deleteMetadataRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "$monitorId") - .setRefreshPolicy(deleteRequest.refreshPolicy) - client.delete( - deleteMetadataRequest, - object : ActionListener { - override fun onResponse(response: DeleteResponse) { - } - - override fun onFailure(t: Exception) { - } - } - ) - } - } - override fun onFailure(t: Exception) { - } - } - ) + private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse { + return client.suspendUntil { delete(deleteRequest, it) } } - private fun deleteDocLevelMonitorQueries() { - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) { - } + private suspend fun deleteMetadata(monitor: Monitor) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + } - override fun onFailure(t: Exception) { + private suspend fun deleteDocLevelMonitorQueries(monitor: Monitor) { + val clusterState = clusterService.state() + if (!clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + return + } + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) } - } - ) + ) + } } } } diff --git a/alerting/src/test/resources/bwc/alerting/1.1.0.0/opensearch-alerting-1.1.0.0.zip b/alerting/src/test/resources/bwc/alerting/1.1.0.0/opensearch-alerting-1.1.0.0.zip new file mode 100644 index 000000000..29afea2cd Binary files /dev/null and b/alerting/src/test/resources/bwc/alerting/1.1.0.0/opensearch-alerting-1.1.0.0.zip differ diff --git a/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.3.0.0.zip b/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.3.0.0.zip new file mode 100644 index 000000000..38788c299 Binary files /dev/null and b/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.3.0.0.zip differ diff --git a/alerting/src/test/resources/notifications/opensearch-notifications-2.3.0.0.zip b/alerting/src/test/resources/notifications/opensearch-notifications-2.3.0.0.zip new file mode 100644 index 000000000..4dfd822a9 Binary files /dev/null and b/alerting/src/test/resources/notifications/opensearch-notifications-2.3.0.0.zip differ