Skip to content

Commit

Permalink
Merge pull request #5 from sbcd90/fan_out4
Browse files Browse the repository at this point in the history
add shard disribution logic in doc-level monitors
  • Loading branch information
eirsep authored Feb 10, 2024
2 parents 1282553 + 3018802 commit 52a04bf
Showing 1 changed file with 22 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
Expand Down Expand Up @@ -85,6 +88,8 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Collectors
import kotlin.math.max

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

object DocumentLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)

Expand Down Expand Up @@ -279,7 +284,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
matchingDocIdsPerIndex?.get(concreteIndexName),
)

val shards = indexUpdatedRunContext.keys
val shards = indexUpdatedRunContext.keys.toMutableSet()
shards.remove("index")
shards.remove("shards_count")

Expand Down Expand Up @@ -311,19 +316,30 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val listener = GroupedActionListener(
object : ActionListener<Collection<DocLevelMonitorFanOutResponse>> {
override fun onResponse(response: Collection<DocLevelMonitorFanOutResponse>) {
logger.info("hit here1")
scope.launch {
val updatedLastRunContext: MutableMap<String, MutableMap<String, Any>> = mutableMapOf()
response.forEach {
it.lastRunContexts.forEach { lastRunContext ->
updatedLastRunContext[lastRunContext.key] = lastRunContext.value as MutableMap<String, Any>
}
}
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}
}

override fun onFailure(e: Exception) {
logger.info("hit here2")
logger.error(e)
}
},
nodeMap.size
nodeShardAssignments.size
)
val responseReader = Writeable.Reader {
DocLevelMonitorFanOutResponse(it)
}
for (node in nodeMap) {
for (node in nodeShardAssignments) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
node.key,
monitor,
Expand All @@ -334,7 +350,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
workflowRunContext
)
transportService!!.sendRequest(
node.value,
nodeMap[node.key],
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
Expand All @@ -351,11 +367,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
}

MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = emptyMap())
} catch (e: Exception) {
Expand Down

0 comments on commit 52a04bf

Please sign in to comment.