Skip to content

Commit

Permalink
fix coordinator node logic
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Feb 23, 2024
1 parent f44d05c commit e4cf101
Show file tree
Hide file tree
Showing 9 changed files with 227 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
AlertingSettings.MAX_ACTIONABLE_ALERT_COUNT,
AlertingSettings.DOC_LEVEL_MONITOR_FAN_OUT_NODES,
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT,
LegacyOpenDistroAlertingSettings.BULK_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -83,6 +81,9 @@ import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Collectors
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand All @@ -96,8 +97,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String,
transportService: TransportService?
transportService: TransportService?,
): MonitorRunResult<DocumentLevelTriggerRunResult> {
if (transportService == null)
throw RuntimeException("transport service should not be null")
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
Expand Down Expand Up @@ -182,6 +185,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
val queryingStartTimeMillis = System.currentTimeMillis()
val docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse> = mutableListOf()
docLevelMonitorInput.indices.forEach { indexName ->

var concreteIndices = IndexUtils.resolveAllIndices(
Expand Down Expand Up @@ -278,8 +282,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
)

val shards = indexUpdatedRunContext.keys
val shards = mutableSetOf<String>()
shards.addAll(indexUpdatedRunContext.keys)
shards.remove("index")
shards.remove("shards_count")

Expand Down Expand Up @@ -307,57 +311,62 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.info(it1.id.toString())
}
}

val listener = GroupedActionListener(
object : ActionListener<Collection<DocLevelMonitorFanOutResponse>> {
override fun onResponse(response: Collection<DocLevelMonitorFanOutResponse>) {
logger.info("hit here1")
}

override fun onFailure(e: Exception) {
logger.info("hit here2")
}
},
nodeMap.size
)
val responseReader = Writeable.Reader {
DocLevelMonitorFanOutResponse(it)
}
for (node in nodeMap) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
node.key,
monitor,
monitorMetadata,
executionId,
listOf(indexExecutionContext),
nodeShardAssignments[node.key]!!.toList(),
workflowRunContext
)
transportService!!.sendRequest(
node.value,
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(listener, responseReader) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
val responses: Collection<DocLevelMonitorFanOutResponse> = suspendCoroutine { cont ->
val listener = GroupedActionListener(
object : ActionListener<Collection<DocLevelMonitorFanOutResponse>> {
override fun onResponse(response: Collection<DocLevelMonitorFanOutResponse>) {
logger.info("hit here1")
cont.resume(response)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
override fun onFailure(e: Exception) {
logger.info("Fan out failed")
cont.resumeWithException(e)
}
}
},
nodeMap.size
)
val responseReader = Writeable.Reader {
DocLevelMonitorFanOutResponse(it)
}
for (node in nodeMap) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
node.key,
monitor,
monitorMetadata,
executionId,
listOf(indexExecutionContext),
nodeShardAssignments[node.key]!!.toList(),
workflowRunContext
)

transportService.sendRequest(
node.value,
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(listener, responseReader) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
}
}
)
}
}
docLevelMonitorFanOutResponses.addAll(responses)
}
}

updateLastRunContextFromFanOutResponses(docLevelMonitorFanOutResponses, updatedLastRunContext)
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
// TODO: Update the Document as part of the Trigger and return back the trigger action result
return monitorResult.copy(triggerResults = emptyMap())
return monitorResult.copy(triggerResults = buildTriggerResults(docLevelMonitorFanOutResponses))
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, executionId, workflowRunContext)
Expand All @@ -382,6 +391,30 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

private fun buildTriggerResults(
docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse>,
): Map<String, DocumentLevelTriggerRunResult> {
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
for (res in docLevelMonitorFanOutResponses) {
for (triggerId in res.triggerResults.keys) {
val documentLevelTriggerRunResult = res.triggerResults[triggerId]
if (documentLevelTriggerRunResult != null) {
if (false == triggerResults.contains(triggerId)) {
val documentLevelTriggerRunResult = documentLevelTriggerRunResult
triggerResults[triggerId] = documentLevelTriggerRunResult!!
} else {
val currVal = triggerResults[triggerId]
val newTrigggeredDocs = mutableListOf<String>()
newTrigggeredDocs.addAll(currVal!!.triggeredDocs)
newTrigggeredDocs.addAll(documentLevelTriggerRunResult.triggeredDocs)
triggerResults.put(triggerId, currVal.copy(triggeredDocs = newTrigggeredDocs))
}
}
}
}
return triggerResults
}

private suspend fun onSuccessfulMonitorRun(monitorCtx: MonitorRunnerExecutionContext, monitor: Monitor) {
monitorCtx.alertService!!.clearMonitorErrorAlert(monitor)
if (monitor.dataSources.alertsHistoryIndex != null) {
Expand Down Expand Up @@ -597,6 +630,35 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}

private fun updateLastRunContextFromFanOutResponses(
docLevelMonitorFanOutResponses: MutableList<DocLevelMonitorFanOutResponse>,
updatedLastRunContext: MutableMap<String, MutableMap<String, Any>>,
) {
// Prepare updatedLastRunContext for each index
for (indexName in updatedLastRunContext.keys) {
for (fanOutResponse in docLevelMonitorFanOutResponses) {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains(indexName)) {
val partialUpdatedIndexLastRunContext = fanOutResponse.lastRunContexts[indexName] as MutableMap<String, Any>
partialUpdatedIndexLastRunContext.keys.forEach {

val seq_no = partialUpdatedIndexLastRunContext[it].toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
seq_no != null &&
seq_no != SequenceNumbers.UNASSIGNED_SEQ_NO.toInt()
) {
indexLastRunContext[it] = seq_no
}
}
}
}
}
}

private fun initializeNewLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
Expand Down Expand Up @@ -670,7 +732,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

private fun getShardsCount(clusterService: ClusterService, index: String): Int {
val allShards: List<ShardRouting> = clusterService!!.state().routingTable().allShards(index)
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index)
return allShards.filter { it.primary() }.size
}

Expand Down Expand Up @@ -1057,7 +1119,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsBytesSize: Long,
monitorCtx: MonitorRunnerExecutionContext,
): Boolean {
var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit
val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

Expand All @@ -1068,23 +1130,23 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
numDocs: Int,
monitorCtx: MonitorRunnerExecutionContext,
): Boolean {
var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings)
var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory
return numDocs >= maxNumDocsThreshold
}

private suspend fun getNodes(monitorCtx: MonitorRunnerExecutionContext): MutableMap<String, DiscoveryNode> {
return monitorCtx.clusterService!!.state().nodes.dataNodes
}

private suspend fun distributeShards(
private fun distributeShards(
monitorCtx: MonitorRunnerExecutionContext,
allNodes: List<String>,
shards: List<String>,
index: String
index: String,
): Map<String, MutableSet<ShardId>> {

val totalShards = shards.size
val totalNodes = allNodes.size.coerceAtMost(totalShards / 2)
val totalNodes = monitorCtx.totalNodesFanOut.coerceAtMost((totalShards + 1) / 2)
val shardsPerNode = totalShards / totalNodes
var shardsRemaining = totalShards % totalNodes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,9 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
@Volatile var percQueryDocsSizeMemoryPercentageLimit: Int =
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
)
Loading

0 comments on commit e4cf101

Please sign in to comment.