Skip to content

Commit

Permalink
refactored DeleteMonitor Action to be synchronious (#628) (#630)
Browse files Browse the repository at this point in the history
* refactored DeleteMonitor Action to be synchronious

Signed-off-by: Petar Dzepina <[email protected]>

Signed-off-by: Petar Dzepina <[email protected]>
Co-authored-by: Petar Dzepina <[email protected]>
  • Loading branch information
2 people authored and sbcd90 committed Nov 2, 2022
1 parent ec6c768 commit 57f6aa3
Showing 1 changed file with 63 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
Expand All @@ -15,6 +19,7 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
Expand All @@ -39,7 +44,9 @@ import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.io.IOException
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java)

Expand Down Expand Up @@ -71,7 +78,8 @@ class TransportDeleteMonitorAction @Inject constructor(
if (!validateUserBackendRoles(user, actionListener)) {
return
}
client.threadPool().threadContext.stashContext().use {

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteMonitorAction")) {
DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart()
}
}
Expand All @@ -83,120 +91,73 @@ class TransportDeleteMonitorAction @Inject constructor(
private val user: User?,
private val monitorId: String
) {

fun resolveUserAndStart() {
if (user == null) {
// Security is disabled, so we can delete the destination without issues
deleteMonitor()
} else if (!doFilterForUser(user)) {
// security is enabled and filterby is disabled.
deleteMonitor()
} else {
try {
start()
} catch (ex: IOException) {
actionListener.onFailure(AlertingException.wrap(ex))
suspend fun resolveUserAndStart() {
try {
val monitor = getMonitor()

val canDelete = user == null ||
!doFilterForUser(user) ||
checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)

if (canDelete) {
val deleteResponse = deleteMonitor(monitor)
deleteMetadata(monitor)
deleteDocLevelMonitorQueries(monitor)
actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version))
} else {
actionListener.onFailure(
AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException())
)
}
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}

fun start() {
private suspend fun getMonitor(): Monitor {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
)
)
return
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
)
val monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
onGetResponse(monitor)
}
override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
)
}

private fun onGetResponse(monitor: Monitor) {
if (!checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId)) {
return
} else {
deleteMonitor()
val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
if (getResponse.isExists == false) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Monitor with $monitorId is not found", RestStatus.NOT_FOUND)
)
)
}
}

private fun deleteMonitor() {
client.delete(
deleteRequest,
object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
val clusterState = clusterService.state()
if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) {
deleteDocLevelMonitorQueries()
}
deleteMetadata()

actionListener.onResponse(DeleteMonitorResponse(response.id, response.version))
}

override fun onFailure(t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor
}

private fun deleteMetadata() {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId)
client.get(
getRequest,
object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (response.isExists) {
val deleteMetadataRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "$monitorId")
.setRefreshPolicy(deleteRequest.refreshPolicy)
client.delete(
deleteMetadataRequest,
object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
}

override fun onFailure(t: Exception) {
}
}
)
}
}
override fun onFailure(t: Exception) {
}
}
)
private suspend fun deleteMonitor(monitor: Monitor): DeleteResponse {
return client.suspendUntil { delete(deleteRequest, it) }
}

private fun deleteDocLevelMonitorQueries() {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) {
}
private suspend fun deleteMetadata(monitor: Monitor) {
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata")
val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
}

override fun onFailure(t: Exception) {
private suspend fun deleteDocLevelMonitorQueries(monitor: Monitor) {
val clusterState = clusterService.state()
if (!clusterState.routingTable.hasIndex(monitor.dataSources.queryIndex)) {
return
}
val response: BulkByScrollResponse = suspendCoroutine { cont ->
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(monitor.dataSources.queryIndex)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.refresh(true)
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) = cont.resume(response)
override fun onFailure(t: Exception) = cont.resumeWithException(t)
}
}
)
)
}
}
}
}

0 comments on commit 57f6aa3

Please sign in to comment.