From 3018802b26f946a8c22361ad5d544f3d16c72051 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sat, 10 Feb 2024 00:36:44 +0000 Subject: [PATCH] add shard disribution logic in doc-level monitors --- .../alerting/DocumentLevelMonitorRunner.kt | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 1a027942f..696e75ec9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException @@ -85,6 +88,8 @@ import java.util.concurrent.atomic.AtomicLong import java.util.stream.Collectors import kotlin.math.max +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + object DocumentLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) @@ -279,7 +284,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { matchingDocIdsPerIndex?.get(concreteIndexName), ) - val shards = indexUpdatedRunContext.keys + val shards = indexUpdatedRunContext.keys.toMutableSet() shards.remove("index") shards.remove("shards_count") @@ -311,19 +316,30 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val listener = GroupedActionListener( object : ActionListener> { override fun onResponse(response: Collection) { - logger.info("hit here1") + scope.launch { + val updatedLastRunContext: MutableMap> = mutableMapOf() + response.forEach { + it.lastRunContexts.forEach { lastRunContext -> + updatedLastRunContext[lastRunContext.key] = lastRunContext.value as MutableMap + } + } + MonitorMetadataService.upsertMetadata( + monitorMetadata.copy(lastRunContext = updatedLastRunContext), + true + ) + } } override fun onFailure(e: Exception) { - logger.info("hit here2") + logger.error(e) } }, - nodeMap.size + nodeShardAssignments.size ) val responseReader = Writeable.Reader { DocLevelMonitorFanOutResponse(it) } - for (node in nodeMap) { + for (node in nodeShardAssignments) { val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( node.key, monitor, @@ -334,7 +350,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { workflowRunContext ) transportService!!.sendRequest( - node.value, + nodeMap[node.key], DocLevelMonitorFanOutAction.NAME, docLevelMonitorFanOutRequest, TransportRequestOptions.EMPTY, @@ -351,11 +367,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } } - - MonitorMetadataService.upsertMetadata( - monitorMetadata.copy(lastRunContext = updatedLastRunContext), - true - ) // TODO: Update the Document as part of the Trigger and return back the trigger action result return monitorResult.copy(triggerResults = emptyMap()) } catch (e: Exception) {