diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt index 3955dbac8..73c9960d2 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt @@ -5,8 +5,12 @@ package org.opensearch.indexmanagement.rollup.interceptor +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager -import org.opensearch.action.ActionListener import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.client.Client @@ -24,6 +28,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.internal.ShardSearchRequest import org.opensearch.search.query.QuerySearchResult import org.opensearch.index.query.QueryBuilders +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.util.convertDateStringToEpochMillis import org.opensearch.indexmanagement.rollup.util.convertFixedIntervalStringToMs import org.opensearch.indexmanagement.rollup.util.getRollupJobs @@ -45,7 +50,6 @@ import org.opensearch.transport.TransportRequestOptions import org.opensearch.transport.TransportResponse import org.opensearch.transport.TransportResponseHandler import java.time.ZonedDateTime -import java.util.concurrent.CountDownLatch import kotlin.math.max import kotlin.math.min @@ -54,7 +58,8 @@ class ResponseInterceptor( val settings: Settings, val indexNameExpressionResolver: IndexNameExpressionResolver, val client: Client -) : TransportInterceptor { +) : TransportInterceptor, + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("Rollup Response Interceptor")) { private val logger = LogManager.getLogger(javaClass) override fun interceptSender(sender: TransportInterceptor.AsyncSender): TransportInterceptor.AsyncSender { return CustomAsyncSender(sender) @@ -123,7 +128,7 @@ class ResponseInterceptor( // Calculated the end time for the current shard index if it is a rollup index with data overlapp @Suppress("SpreadOperator") - fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String, shardRequestIndex: String): Long { + suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String, shardRequestIndex: String): Long { // Build search request to find the maximum rollup timestamp <= liveDataStartPoint val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC) val query = QueryBuilders.boolQuery() @@ -136,24 +141,9 @@ class ResponseInterceptor( val req = SearchRequest() .source(searchSourceBuilder) .indices(*rollupIndices) - var res: SearchResponse? = null - val latch = CountDownLatch(1) - client.search( - req, - object : ActionListener { - override fun onResponse(searchResponse: SearchResponse) { - res = searchResponse - latch.countDown() - logger.info("ronsax find rollup endtime for index $shardRequestIndex latch is ${latch.count}") - } + logger.info("ronsax sending search request for endtiem of $shardRequestIndex") + val res = client.suspendUntil { search(req, it) } - override fun onFailure(e: Exception) { - logger.error("request to find intersection time failed :(", e) - latch.countDown() - } - } - ) - latch.await() try { return res!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long } catch (e: Exception) { @@ -165,7 +155,7 @@ class ResponseInterceptor( // Returns Pair(startRange: Long, endRange: Long) // Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds @Suppress("LongMethod", "SpreadOperator") - fun findOverlap(response: QuerySearchResult): Pair { + suspend fun findOverlap(response: QuerySearchResult): Pair { val job: Rollup = getRollupJob(response)!! // maybe throw a try catch later var dateSourceField: String = "" var dateTargetField: String = "" @@ -193,25 +183,9 @@ class ResponseInterceptor( val maxRolledDateRequest = SearchRequest() .source(searchSourceBuilder) .indices(*rollupIndices) // add all rollup indices to this request - var maxRolledDateResponse: SearchResponse? = null - var latch = CountDownLatch(1) - logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex latch is ${latch.count}") - client.search( - maxRolledDateRequest, - object : ActionListener { - override fun onResponse(searchResponse: SearchResponse) { - maxRolledDateResponse = searchResponse - latch.countDown() - logger.info("ronsax found max rollup time for index: $shardRequestIndex latch is ${latch.count}") - } + logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex") + var maxRolledDateResponse: SearchResponse? = client.suspendUntil { search(maxRolledDateRequest, it) } - override fun onFailure(e: Exception) { - logger.error("maxLiveDate request failed in response interceptor", e) - latch.countDown() - } - } - ) - latch.await() // Build search request to find the minimum date in all live indices sort = SortBuilders.fieldSort(dateSourceField).order(SortOrder.ASC) searchSourceBuilder = SearchSourceBuilder() @@ -230,32 +204,15 @@ class ResponseInterceptor( minLiveDateRequest.indices(shardRequestIndex) } - var minLiveDateResponse: SearchResponse? = null - latch = CountDownLatch(1) - logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex latch is ${latch.count}") - client.search( - minLiveDateRequest, - object : ActionListener { - override fun onResponse(searchResponse: SearchResponse) { - minLiveDateResponse = searchResponse - latch.countDown() - logger.info("ronsax found minLiveData for index: $shardRequestIndex latch is ${latch.count}") - } - - override fun onFailure(e: Exception) { - logger.error("minLiveDate request failed in response interceptor", e) - latch.countDown() - } - } - ) - latch.await() + logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex") + var minLiveDateResponse: SearchResponse? = client.suspendUntil { search(minLiveDateRequest, it) } val foundMinAndMax = (minLiveDateResponse != null && maxRolledDateResponse != null) // if they overlap find part to exclude if (foundMinAndMax && minLiveDateResponse!!.hits.hits.isNotEmpty() && maxRolledDateResponse!!.hits.hits.isNotEmpty()) { // Rollup data ends at maxRolledDate + fixedInterval - val maxRolledDate: Long = maxRolledDateResponse!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long + val maxRolledDate: Long = maxRolledDateResponse.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long val rollupDataEndPoint = maxRolledDate + convertFixedIntervalStringToMs(fixedInterval = rollupInterval!!) - val minLiveDate = minLiveDateResponse!!.hits.hits[0].sourceAsMap.get("$dateSourceField") as String + val minLiveDate = minLiveDateResponse.hits.hits[0].sourceAsMap.get("$dateSourceField") as String val liveDataStartPoint = convertDateStringToEpochMillis(minLiveDate) // If intersection found on rollup index, remove overlap if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) { @@ -412,11 +369,13 @@ class ResponseInterceptor( // live index is QuerySearchResult -> { if (response.hasAggs() && isRewrittenInterceptorRequest(response)) { - // Check for overlap - val (startTime, endTime) = findOverlap(response) - // Modify agg to be original result without overlap computed in - response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime)) - originalHandler?.handleResponse(response) + launch { + // Check for overlap + val (startTime, endTime) = findOverlap(response) + // Modify agg to be original result without overlap computed in + response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime)) + originalHandler?.handleResponse(response) + } } else { originalHandler?.handleResponse(response) }