From 06b7b028491d4037b6f558c66a0c7f55abb0dbe7 Mon Sep 17 00:00:00 2001 From: Ronnak Saxena Date: Tue, 12 Sep 2023 13:29:33 -0700 Subject: [PATCH] removed countdown latch and added coroutines Signed-off-by: Ronnak Saxena --- .../rollup/interceptor/ResponseInterceptor.kt | 100 +++++------------- 1 file changed, 29 insertions(+), 71 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt index eb0a4e9db..19b6bccdc 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/ResponseInterceptor.kt @@ -5,8 +5,12 @@ package org.opensearch.indexmanagement.rollup.interceptor +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager -import org.opensearch.action.ActionListener import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.client.Client @@ -24,6 +28,7 @@ import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.internal.ShardSearchRequest import org.opensearch.search.query.QuerySearchResult import org.opensearch.index.query.QueryBuilders +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.util.convertDateStringToEpochMillis import org.opensearch.indexmanagement.rollup.util.convertFixedIntervalStringToMs import org.opensearch.indexmanagement.rollup.util.getRollupJobs @@ -45,7 +50,6 @@ import org.opensearch.transport.TransportRequestOptions import org.opensearch.transport.TransportResponse import org.opensearch.transport.TransportResponseHandler import java.time.ZonedDateTime -import java.util.concurrent.CountDownLatch import kotlin.math.max import kotlin.math.min @@ -54,7 +58,8 @@ class ResponseInterceptor( val settings: Settings, val indexNameExpressionResolver: IndexNameExpressionResolver, val client: Client -) : TransportInterceptor { +) : TransportInterceptor, + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("Rollup Response Interceptor")) { private val logger = LogManager.getLogger(javaClass) override fun interceptSender(sender: TransportInterceptor.AsyncSender): TransportInterceptor.AsyncSender { return CustomAsyncSender(sender) @@ -123,7 +128,7 @@ class ResponseInterceptor( // Calculated the end time for the current shard index if it is a rollup index with data overlapp @Suppress("SpreadOperator") - fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String): Long { + suspend fun getRollupEndTime(liveDataStartPoint: Long, rollupIndices: Array, dateTargetField: String): Long { // Build search request to find the maximum rollup timestamp <= liveDataStartPoint val sort = SortBuilders.fieldSort("$dateTargetField.date_histogram").order(SortOrder.DESC) val query = QueryBuilders.boolQuery() @@ -136,23 +141,7 @@ class ResponseInterceptor( val req = SearchRequest() .source(searchSourceBuilder) .indices(*rollupIndices) - var res: SearchResponse? = null - val latch = CountDownLatch(1) - client.search( - req, - object : ActionListener { - override fun onResponse(searchResponse: SearchResponse) { - res = searchResponse - latch.countDown() - } - - override fun onFailure(e: Exception) { - logger.error("request to find intersection time failed :(", e) - latch.countDown() - } - } - ) - latch.await() + val res = client.suspendUntil { search(req, it) } try { return res!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long } catch (e: Exception) { @@ -164,7 +153,7 @@ class ResponseInterceptor( // Returns Pair(startRange: Long, endRange: Long) // Note startRange is inclusive and endRange is exclusive, they are Longs because the type is epoch milliseconds @Suppress("LongMethod", "SpreadOperator") - fun findOverlap(response: QuerySearchResult): Pair { + suspend fun findOverlap(response: QuerySearchResult): Pair { val job: Rollup = getRollupJob(response)!! // maybe throw a try catch later var dateSourceField: String = "" var dateTargetField: String = "" @@ -189,27 +178,11 @@ class ResponseInterceptor( .query(oldQuery) .size(1) // Need to avoid infinite interceptor loop - val maxRolledDateRequest = SearchRequest() + val maxRollupDateRequest = SearchRequest() .source(searchSourceBuilder) .indices(*rollupIndices) // add all rollup indices to this request - var maxRolledDateResponse: SearchResponse? = null - var latch = CountDownLatch(1) - logger.info("ronsax sending request to find max rollup time for index: $shardRequestIndex") - client.search( - maxRolledDateRequest, - object : ActionListener { - override fun onResponse(searchResponse: SearchResponse) { - maxRolledDateResponse = searchResponse - latch.countDown() - } - - override fun onFailure(e: Exception) { - logger.error("maxLiveDate request failed in response interceptor", e) - latch.countDown() - } - } - ) - latch.await() + logger.info("Sending maxRollupDate request for $shardRequestIndex") + val maxRollupDateResponse: SearchResponse? = client.suspendUntil { search(maxRollupDateRequest, it) } // Build search request to find the minimum date in all live indices sort = SortBuilders.fieldSort(dateSourceField).order(SortOrder.ASC) searchSourceBuilder = SearchSourceBuilder() @@ -227,32 +200,15 @@ class ResponseInterceptor( } else { // shard index is live index minLiveDateRequest.indices(shardRequestIndex) } - - var minLiveDateResponse: SearchResponse? = null - latch = CountDownLatch(1) - logger.info("ronsax sending request to find minLiveData for index: $shardRequestIndex") - client.search( - minLiveDateRequest, - object : ActionListener { - override fun onResponse(searchResponse: SearchResponse) { - minLiveDateResponse = searchResponse - latch.countDown() - } - - override fun onFailure(e: Exception) { - logger.error("minLiveDate request failed in response interceptor", e) - latch.countDown() - } - } - ) - latch.await() - val foundMinAndMax = (minLiveDateResponse != null && maxRolledDateResponse != null) + logger.info("Sending minLiveData request for $shardRequestIndex") + var minLiveDateResponse: SearchResponse? = client.suspendUntil { search(minLiveDateRequest, it) } + val foundMinAndMax = (minLiveDateResponse != null && maxRollupDateResponse != null) // if they overlap find part to exclude - if (foundMinAndMax && minLiveDateResponse!!.hits.hits.isNotEmpty() && maxRolledDateResponse!!.hits.hits.isNotEmpty()) { + if (foundMinAndMax && minLiveDateResponse!!.hits.hits.isNotEmpty() && maxRollupDateResponse!!.hits.hits.isNotEmpty()) { // Rollup data ends at maxRolledDate + fixedInterval - val maxRolledDate: Long = maxRolledDateResponse!!.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long + val maxRolledDate: Long = maxRollupDateResponse.hits.hits[0].sourceAsMap.get("$dateTargetField.date_histogram") as Long val rollupDataEndPoint = maxRolledDate + convertFixedIntervalStringToMs(fixedInterval = rollupInterval!!) - val minLiveDate = minLiveDateResponse!!.hits.hits[0].sourceAsMap.get("$dateSourceField") as String + val minLiveDate = minLiveDateResponse.hits.hits[0].sourceAsMap.get("$dateSourceField") as String val liveDataStartPoint = convertDateStringToEpochMillis(minLiveDate) // If intersection found on rollup index, remove overlap if ((liveDataStartPoint < rollupDataEndPoint) && isShardIndexRollup) { @@ -355,8 +311,8 @@ class ResponseInterceptor( // Store the running values of the aggregations being computed // {aggName: String: Pair} val aggValues = mutableMapOf>() -// -// // Iterate through each aggregation and bucket + + // Iterate through each aggregation and bucket val interceptorAgg = intervalAggregations.asMap().get("interceptor_interval_data") as InternalDateHistogram for (bucket in interceptorAgg.buckets) { val zdt = bucket.key as ZonedDateTime @@ -409,11 +365,13 @@ class ResponseInterceptor( // live index is QuerySearchResult -> { if (response.hasAggs() && isRewrittenInterceptorRequest(response)) { - // Check for overlap - val (startTime, endTime) = findOverlap(response) - // Modify agg to be original result without overlap computed in - response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime)) - originalHandler?.handleResponse(response) + launch { + // Check for overlap + val (startTime, endTime) = findOverlap(response) + // Modify agg to be original result without overlap computed in + response.aggregations(computeAggregationsWithoutOverlap(response.aggregations().expand(), startTime, endTime)) + originalHandler?.handleResponse(response) + } } else { originalHandler?.handleResponse(response) }