Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] Refactored DeleteMonitor Action to be synchronous #630

Merged
merged 1 commit into from
Nov 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
Expand All @@ -15,6 +19,7 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
Expand All @@ -39,7 +44,9 @@ import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.io.IOException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

Expand Down Expand Up @@ -71,7 +78,8 @@ class TransportDeleteMonitorAction @Inject constructor(
if (!validateUserBackendRoles(user, actionListener)) {
return
}
client.threadPool().threadContext.stashContext().use {

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
}
}
Expand All @@ -83,120 +91,73 @@ class TransportDeleteMonitorAction @Inject constructor(
private val user: User?,
private val monitorId: String
) {

fun resolveUserAndStart() {
if (user == null) {
// Security is disabled, so we can delete the destination without issues
deleteMonitor()
} else if (!doFilterForUser(user)) {
// security is enabled and filterby is disabled.
deleteMonitor()
} else {
try {
start()
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
suspend fun resolveUserAndStart() {
try {
val monitor = getMonitor()

val canDelete = user == null ||
!doFilterForUser(user) ||
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)

if (canDelete) {
val deleteResponse = deleteMonitor(monitor)
deleteMetadata(monitor)
deleteDocLevelMonitorQueries(monitor)
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
} else {
actionListener.onFailure(
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
)
}
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}

fun start() {
private suspend fun getMonitor(): Monitor {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
)
)
return
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
)
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
onGetResponse(monitor)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
)
}

private fun onGetResponse(monitor: Monitor) {
if (!checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)) {
return
} else {
deleteMonitor()
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
if (getResponse.isExists == false) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
)
)
}
}

private fun deleteMonitor() {
client.delete(
deleteRequest,
object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
val clusterState = clusterService.state()
if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) {
deleteDocLevelMonitorQueries()
}
deleteMetadata()

actionListener.onResponse(DeleteMonitorResponse(response.id, response.version))
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
}

private fun deleteMetadata() {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (response.isExists) {
val deleteMetadataRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "$monitorId")
.setRefreshPolicy(deleteRequest.refreshPolicy)
client.delete(
deleteMetadataRequest,
object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
}

override fun onFailure(t: Exception) {
}
}
)
}
}
override fun onFailure(t: Exception) {
}
}
)
private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
return client.suspendUntil { delete(deleteRequest, it) }
}

private fun deleteDocLevelMonitorQueries() {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) {
}
private suspend fun deleteMetadata(monitor: Monitor) {
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
}

override fun onFailure(t: Exception) {
private suspend fun deleteDocLevelMonitorQueries(monitor: Monitor) {
val clusterState = clusterService.state()
if (!clusterState.routingTable.hasIndex(monitor.dataSources.queryIndex)) {
return
}
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(monitor.dataSources.queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
}
)
)
}
}
}
}