diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt index b7067f3b0..07668e8ea 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt @@ -5,13 +5,15 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.apache.lucene.search.join.ScoreMode import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse -import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters @@ -20,6 +22,7 @@ import org.opensearch.alerting.action.GetMonitorAction import org.opensearch.alerting.action.GetMonitorRequest import org.opensearch.alerting.action.GetMonitorResponse import org.opensearch.alerting.action.GetMonitorResponse.AssociatedWorkflow +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH @@ -43,6 +46,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportGetMonitorAction @Inject constructor( transportService: TransportService, @@ -118,18 +122,24 @@ class TransportGetMonitorAction @Inject constructor( } } } - - actionListener.onResponse( - GetMonitorResponse( - response.id, - response.version, - response.seqNo, - response.primaryTerm, - RestStatus.OK, - monitor, - getAssociatedWorkflows(response.id) - ) - ) + try { + scope.launch { + val associatedCompositeMonitors = getAssociatedWorkflows(response.id) + actionListener.onResponse( + GetMonitorResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + monitor, + associatedCompositeMonitors + ) + ) + } + } catch (e: Exception) { + log.error("Failed to get associate workflows in get monitor action", e) + } } override fun onFailure(t: Exception) { @@ -140,7 +150,7 @@ class TransportGetMonitorAction @Inject constructor( } } - private fun getAssociatedWorkflows(id: String): List { + private suspend fun getAssociatedWorkflows(id: String): List { try { val associatedWorkflows = mutableListOf() val queryBuilder = QueryBuilders.nestedQuery( @@ -156,7 +166,7 @@ class TransportGetMonitorAction @Inject constructor( val searchRequest = SearchRequest() .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) .source(SearchSourceBuilder().query(queryBuilder).fetchField("_id")) - val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get() + val response: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } for (hit in response.hits) { XContentType.JSON.xContent().createParser(