diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 30d485eba..dd40ae100 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -72,6 +72,7 @@ import org.opensearch.commons.notifications.model.NotificationConfigInfo import org.opensearch.core.action.ActionListener import org.opensearch.core.common.Strings import org.opensearch.core.common.bytes.BytesReference +import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder @@ -137,6 +138,7 @@ class TransportDocLevelMonitorFanOutAction var percolateQueriesTimeTaken = AtomicLong(0) var totalDocsQueried = AtomicLong(0) var docTransformTimeTaken = AtomicLong(0) + val shardIdFailureMap = mutableMapOf() val updatedIndexNames = request.indexExecutionContexts[0].updatedIndexNames val concreteIndexNames = request.indexExecutionContexts[0].concreteIndexNames val monitorMetadata = request.monitorMetadata @@ -193,7 +195,8 @@ class TransportDocLevelMonitorFanOutAction nonPercolateSearchesTimeTaken, percolateQueriesTimeTaken, totalDocsQueried, - docTransformTimeTaken + docTransformTimeTaken, + shardIdFailureMap, ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo } @@ -276,7 +279,7 @@ class TransportDocLevelMonitorFanOutAction nodeId = monitorCtx.clusterService!!.localNode().id, executionId = request.executionId, monitorId = monitor.id, - shardIdFailureMap = emptyMap(), + shardIdFailureMap = shardIdFailureMap, findingIds = emptyList(), lastRunContext as MutableMap, InputRunResults(listOf(inputRunResults)), @@ -403,6 +406,7 @@ class TransportDocLevelMonitorFanOutAction percolateQueriesTimeTaken: AtomicLong, totalDocsQueried: AtomicLong, docTransformTimeTake: AtomicLong, + shardIdFailureMap: MutableMap, updateLastRunContext: (String, String) -> Unit, ) { val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int @@ -476,6 +480,12 @@ class TransportDocLevelMonitorFanOutAction "Error: ${e.message}", e ) + val s = ShardId( + indexExecutionCtx.concreteIndexName, + monitorCtx.clusterService!!.state().metadata.index(indexExecutionCtx.concreteIndexName).indexUUID, + i // shard id + ) + shardIdFailureMap[s.toString()] = e if (e is IndexClosedException) { throw e }