Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc-level monitor fan-out approach #1496

Merged
merged 17 commits into from
Apr 17, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,11 @@ import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocDat
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -321,7 +308,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
) {
override fun handleException(e: TransportException) {
logger.error("Fan out retry failed in node ${localNode.id}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz log exception. If we dont log exception debugging is tough .

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed this.

listener.onFailure(e)
listener.onResponse(
DocLevelMonitorFanOutResponse(
"",
"",
"",
mutableMapOf(),
exception = AlertingException.wrap(e) as AlertingException
)
)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
Expand All @@ -331,7 +326,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
} else {
logger.error("Fan out failed in node ${node.key}", e)
listener.onFailure(e)
listener.onResponse(
DocLevelMonitorFanOutResponse(
"",
"",
"",
mutableMapOf(),
exception = AlertingException.wrap(e) as AlertingException
)
)
}
}

Expand Down Expand Up @@ -389,20 +392,22 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare updatedLastRunContext for each index
for (indexName in updatedLastRunContext.keys) {
for (fanOutResponse in docLevelMonitorFanOutResponses) {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
fanOutResponse.lastRunContexts.keys.forEach {

val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
seq_no != null &&
seq_no >= 0
) {
indexLastRunContext[it] = seq_no
if (fanOutResponse.exception == null) {
// fanOutResponse.lastRunContexts //updatedContexts for relevant shards
val indexLastRunContext = updatedLastRunContext[indexName] as MutableMap<String, Any>

if (fanOutResponse.lastRunContexts.contains("index") && fanOutResponse.lastRunContexts["index"] == indexName) {
fanOutResponse.lastRunContexts.keys.forEach {

val seq_no = fanOutResponse.lastRunContexts[it].toString().toIntOrNull()
if (
it != "shards_count" &&
it != "index" &&
seq_no != null &&
seq_no >= 0
) {
indexLastRunContext[it] = seq_no
}
}
}
}
Expand All @@ -416,36 +421,38 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
val triggerErrorMap = mutableMapOf<String, MutableList<AlertingException>>()
for (res in docLevelMonitorFanOutResponses) {
for (triggerId in res.triggerResults.keys) {
val documentLevelTriggerRunResult = res.triggerResults[triggerId]
if (documentLevelTriggerRunResult != null) {
if (false == triggerResults.contains(triggerId)) {
triggerResults[triggerId] = documentLevelTriggerRunResult
triggerErrorMap[triggerId] = if (documentLevelTriggerRunResult.error != null) {
val error = if (documentLevelTriggerRunResult.error is AlertingException) {
documentLevelTriggerRunResult.error as AlertingException
if (res.exception == null) {
for (triggerId in res.triggerResults.keys) {
val documentLevelTriggerRunResult = res.triggerResults[triggerId]
if (documentLevelTriggerRunResult != null) {
if (false == triggerResults.contains(triggerId)) {
triggerResults[triggerId] = documentLevelTriggerRunResult
triggerErrorMap[triggerId] = if (documentLevelTriggerRunResult.error != null) {
val error = if (documentLevelTriggerRunResult.error is AlertingException) {
documentLevelTriggerRunResult.error as AlertingException
} else {
AlertingException.wrap(documentLevelTriggerRunResult.error!!) as AlertingException
}
mutableListOf(error)
} else {
AlertingException.wrap(documentLevelTriggerRunResult.error!!) as AlertingException
mutableListOf()
}
mutableListOf(error)
} else {
mutableListOf()
}
} else {
val currVal = triggerResults[triggerId]
val newTriggeredDocs = mutableListOf<String>()
newTriggeredDocs.addAll(currVal!!.triggeredDocs)
newTriggeredDocs.addAll(documentLevelTriggerRunResult.triggeredDocs)
val newActionResults = mutableMapOf<String, MutableMap<String, ActionRunResult>>()
newActionResults.putAll(currVal.actionResultsMap)
newActionResults.putAll(documentLevelTriggerRunResult.actionResultsMap)
triggerResults[triggerId] = currVal.copy(
triggeredDocs = newTriggeredDocs,
actionResultsMap = newActionResults
)
val currVal = triggerResults[triggerId]
val newTriggeredDocs = mutableListOf<String>()
newTriggeredDocs.addAll(currVal!!.triggeredDocs)
newTriggeredDocs.addAll(documentLevelTriggerRunResult.triggeredDocs)
val newActionResults = mutableMapOf<String, MutableMap<String, ActionRunResult>>()
newActionResults.putAll(currVal.actionResultsMap)
newActionResults.putAll(documentLevelTriggerRunResult.actionResultsMap)
triggerResults[triggerId] = currVal.copy(
triggeredDocs = newTriggeredDocs,
actionResultsMap = newActionResults
)

if (documentLevelTriggerRunResult.error != null) {
triggerErrorMap[triggerId]!!.add(documentLevelTriggerRunResult.error as AlertingException)
if (documentLevelTriggerRunResult.error != null) {
triggerErrorMap[triggerId]!!.add(documentLevelTriggerRunResult.error as AlertingException)
}
}
}
}
Expand All @@ -464,17 +471,19 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
val errors: MutableList<AlertingException> = mutableListOf()
for (response in docLevelMonitorFanOutResponses) {
if (response.inputResults.error != null) {
if (response.inputResults.error is AlertingException) {
errors.add(response.inputResults.error)
} else {
errors.add(AlertingException.wrap(response.inputResults.error) as AlertingException)
if (response.exception == null) {
if (response.inputResults.error != null) {
if (response.inputResults.error is AlertingException) {
errors.add(response.inputResults.error)
} else {
errors.add(AlertingException.wrap(response.inputResults.error) as AlertingException)
}
}
}
val partialResult = response.inputResults.results
for (result in partialResult) {
for (id in result.keys) {
inputRunResults.getOrPut(id) { mutableSetOf() }.addAll(result[id] as Collection<String>)
val partialResult = response.inputResults.results
for (result in partialResult) {
for (id in result.keys) {
inputRunResults.getOrPut(id) { mutableSetOf() }.addAll(result[id] as Collection<String>)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting.action

import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.util.AlertingException
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
Expand All @@ -22,6 +23,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
val lastRunContexts: MutableMap<String, Any>
val inputResults: InputRunResults
val triggerResults: Map<String, DocumentLevelTriggerRunResult>
val exception: AlertingException?

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Expand All @@ -30,7 +32,8 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
monitorId = sin.readString(),
lastRunContexts = sin.readMap()!! as MutableMap<String, Any>,
inputResults = InputRunResults.readFrom(sin),
triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom))
triggerResults = suppressWarning(sin.readMap(StreamInput::readString, DocumentLevelTriggerRunResult::readFrom)),
exception = sin.readException()
)

constructor(
Expand All @@ -40,13 +43,15 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
lastRunContexts: MutableMap<String, Any>,
inputResults: InputRunResults = InputRunResults(), // partial,
triggerResults: Map<String, DocumentLevelTriggerRunResult> = mapOf(),
exception: AlertingException? = null
) : super() {
this.nodeId = nodeId
this.executionId = executionId
this.monitorId = monitorId
this.lastRunContexts = lastRunContexts
this.inputResults = inputResults
this.triggerResults = triggerResults
this.exception = exception
}

@Throws(IOException::class)
Expand All @@ -61,6 +66,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
StreamOutput::writeString,
{ stream, stats -> stats.writeTo(stream) }
)
out.writeException(exception)
}

@Throws(IOException::class)
Expand All @@ -72,6 +78,7 @@ class DocLevelMonitorFanOutResponse : ActionResponse, ToXContentObject {
.field("last_run_contexts", lastRunContexts)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rest of the fields?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it.

.field("input_results", inputResults)
.field("trigger_results", triggerResults)
.field("exception", exception)
.endObject()
return builder
}
Expand Down
Loading