Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance per bucket, and per document monitor notification message ctx. #1450

Merged
merged 17 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
Expand All @@ -25,7 +27,9 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -220,6 +224,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
}

AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
// The alertSampleDocs map structure is Map<TriggerId, Map<BucketKeysHash, List<Alert>>>
val alertSampleDocs = mutableMapOf<String, Map<String, List<Map<String, Any>>>>()
for (trigger in monitor.triggers) {
val alertsToUpdate = mutableSetOf<Alert>()
val completedAlertsToUpdate = mutableSetOf<Alert>()
Expand All @@ -230,6 +236,32 @@ object BucketLevelMonitorRunner : MonitorRunner() {
?: mutableListOf()
// Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts)

// Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data.
val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty()
if (isTriggered && printsSampleDocData(trigger)) {
try {
val searchRequest = monitorCtx.inputService!!.getSearchRequest(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This current implementation executes the monitor query multiple times; the first time on line 124 to collect the data for trigger evaluation, and then subsequent searches are executed for each triggered trigger in order to collect sample documents.

Ideally, we want to collect the sample documents in the call to collectInputResults on line 124 so we can avoid multiple queries as that will improve performance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue #1481 to track this follow-up item.

monitor = monitor.copy(triggers = listOf(trigger)),
searchInput = monitor.inputs[0] as SearchInput,
periodStart = periodStart,
periodEnd = periodEnd,
prevResult = monitorResult.inputResults,
matchingDocIdsPerIndex = null,
returnSampleDocs = true
)
val sampleDocumentsByBucket = getSampleDocs(
client = monitorCtx.client!!,
monitorId = monitor.id,
triggerId = trigger.id,
searchRequest = searchRequest
)
alertSampleDocs[trigger.id] = sampleDocumentsByBucket
} catch (e: Exception) {
logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e)
}
}

val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

Expand All @@ -255,8 +287,11 @@ object BucketLevelMonitorRunner : MonitorRunner() {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)

val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorOrTriggerError
alertCategory, alertContext, triggerCtx, monitorOrTriggerError
)
// AggregationResultBucket should not be null here
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
Expand Down Expand Up @@ -287,7 +322,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {

val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
newAlerts = newAlerts,
newAlerts = newAlerts.map {
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
},
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
Expand Down Expand Up @@ -480,17 +517,93 @@ object BucketLevelMonitorRunner : MonitorRunner() {

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
alertContext: AlertContext,
ctx: BucketLevelTriggerExecutionContext,
error: Exception?
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
ctx.copy(dedupedAlerts = listOf(alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
AlertCategory.NEW ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alert), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
AlertCategory.COMPLETED ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
}
}

private fun getAlertContext(
alert: Alert,
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
): AlertContext {
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) {
AlertContext(alert = alert, sampleDocs = sampleDocs)
} else {
logger.error(
"Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}.",
alert.id,
alert.triggerId,
alert.monitorId,
alert.executionId
)
AlertContext(alert = alert, sampleDocs = listOf())
}
}

/**
* Executes the monitor's query with the addition of 2 top_hits aggregations that are used to return the top 5,
* and bottom 5 documents for each bucket.
*
* @return Map<BucketKeysHash, List<Alert>>
*/
@Suppress("UNCHECKED_CAST")
private suspend fun getSampleDocs(
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
client: Client,
monitorId: String,
triggerId: String,
searchRequest: SearchRequest
): Map<String, List<Map<String, Any>>> {
val sampleDocumentsByBucket = mutableMapOf<String, List<Map<String, Any>>>()
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf<String, Any>()) as Map<String, Any>
val compositeAgg = aggs.getOrDefault("composite_agg", mapOf<String, Any>()) as Map<String, Any>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all aggs of composite type??

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The frontend visual editor translates the bucket monitor creation form selections into a composite aggregation (source); however, different aggregation types could potentially be configured using the extraction query editor.

As discussed offline, I'll take a follow-up item to support more than composite aggregations.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created issue #1473 to track this follow-up item.

val buckets = compositeAgg.getOrDefault("buckets", emptyList<Map<String, Any>>()) as List<Map<String, Any>>

buckets.forEach { bucket ->
val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf<String, String>()) as Map<String, String>).values.toList())
if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.")

val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val topHits = unwrappedTopHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val lowHits = unwrappedLowHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

// Reversing the order of lowHits so allHits will be in descending order.
val allHits = topHits + lowHits.reversed()

if (allHits.isEmpty()) {
// We expect sample documents to be available for each bucket.
logger.error("Sample documents not found for trigger {} of monitor {}.", triggerId, monitorId)
}

// Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each.
// The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data.
val uniqueHitIds = mutableSetOf<String>()
val dedupedHits = mutableListOf<Map<String, Any>>()
allHits.forEach { hit ->
val hitId = hit["_id"] as String
if (!uniqueHitIds.contains(hitId)) {
uniqueHitIds.add(hitId)
dedupedHits.add(hit)
}
}
sampleDocumentsByBucket[bucketKey] = dedupedHits
}

return sampleDocumentsByBucket
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,28 @@ import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.get.MultiGetItemResponse
import org.opensearch.action.get.MultiGetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
Expand Down Expand Up @@ -64,6 +70,7 @@ import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
Expand All @@ -83,6 +90,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()

// Maps a finding ID to the related document.
private val findingIdToDocSource = mutableMapOf<String, MultiGetItemResponse>()

override suspend fun runMonitor(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
Expand All @@ -95,6 +105,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
monitorCtx.findingsToTriggeredQueries = mutableMapOf()

try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
Expand Down Expand Up @@ -455,7 +466,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
error = monitorResult.error ?: triggerResult.error
)

if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty())
getDocSources(
findingToDocPairs = findingToDocPairs,
monitorCtx = monitorCtx,
monitor = monitor
)

val alerts = mutableListOf<Alert>()
val alertContexts = mutableListOf<AlertContext>()
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
Expand All @@ -466,6 +485,18 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
workflorwRunContext = workflowRunContext
)
alerts.add(alert)

val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap()

alertContexts.add(
AlertContext(
alert = alert,
associatedQueries = alert.findingIds.flatMap { findingId ->
monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList()
},
sampleDocs = listOfNotNull(docSource)
)
)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
Expand All @@ -479,13 +510,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
for (action in trigger.actions) {
val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alert in alerts) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun)
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
for (alertContext in alertContexts) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alertContext)), monitorCtx, monitor, dryrun)
triggerResult.actionResultsMap.getOrPut(alertContext.alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alertContext.alert.id]?.set(action.id, actionResults)
}
} else if (alerts.isNotEmpty()) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun)
} else if (alertContexts.isNotEmpty()) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun)
for (alert in alerts) {
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
Expand Down Expand Up @@ -532,6 +563,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
val findingsToTriggeredQueries = mutableMapOf<String, List<DocLevelQuery>>()

docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
Expand All @@ -552,6 +584,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)
findingsToTriggeredQueries[finding.id] = triggeredQueries

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
Expand All @@ -578,6 +611,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// suppress exception
logger.error("Optional finding callback failed", e)
}

if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries
else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries

return findingDocPairs
}

Expand Down Expand Up @@ -1047,6 +1084,40 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return numDocs >= maxNumDocsThreshold
}

/**
* Performs an mGet request to retrieve the documents associated with findings.
*
* When possible, this will only retrieve the document fields that are specifically
* referenced for printing in the mustache template.
*/
private suspend fun getDocSources(
findingToDocPairs: List<Pair<String, String>>,
monitorCtx: MonitorRunnerExecutionContext,
monitor: Monitor
) {
val docFieldTags = parseSampleDocTags(monitor.triggers)
val request = MultiGetRequest()

// Perform mGet request in batches.
findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
batch.forEach { (findingId, docIdAndIndex) ->
val docIdAndIndexSplit = docIdAndIndex.split("|")
val docId = docIdAndIndexSplit[0]
val concreteIndex = docIdAndIndexSplit[1]
if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) {
val docItem = MultiGetRequest.Item(concreteIndex, docId)
if (docFieldTags.isNotEmpty())
docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
request.add(docItem)
}
val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) }
response.responses.forEach { item ->
findingIdToDocSource[findingId] = item
}
}
}
}

/**
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
* and doc source. A list of these POJOs would be passed to percolate query execution logic.
Expand Down
Loading
Loading