Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize execution of workflow consisting of bucket-level followed by doc-level monitors #1729

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex?.first
val findingIdsForMatchingDocIds = workflowRunContext?.matchingDocIdsPerIndex?.second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this map each individual doc id to finding id

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will still have one part <index to doc ids> and second part list of findingids.


val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
Expand Down Expand Up @@ -226,6 +227,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
findingIdsForMatchingDocIds
)

val shards = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class InputService(
periodStart = periodStart,
periodEnd = periodEnd,
prevResult = prevResult,
matchingDocIdsPerIndex = matchingDocIdsPerIndex,
matchingDocIdsPerIndex = matchingDocIdsPerIndex?.first,
returnSampleDocs = false
)
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class WorkflowService(
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String):
Pair<Map<String, List<String>>, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
return Pair(emptyMap(), listOf())
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
if (existsResponse.isExists == false) return Pair(emptyMap(), listOf())
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkflowService(
for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}
return indexToRelatedDocIdsMap
return Pair(indexToRelatedDocIdsMap, findings.map { it.id })
} catch (t: Exception) {
log.error("Error getting finding doc ids: ${t.message}", t)
throw AlertingException.wrap(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,11 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
/**
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
* generates a single alert with multiple findings.
*/
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
Expand All @@ -312,9 +316,9 @@ class TransportDocLevelMonitorFanOutAction
workflowRunContext = workflowRunContext
)
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerIgnoringFindingsAndAlerts(
triggerResults[it.id] = runForEachDocTriggerWithoutPersistFindingsAndAlerts(
monitorResult,
it as DocumentLevelTrigger,
monitor,
Expand Down Expand Up @@ -363,7 +367,10 @@ class TransportDocLevelMonitorFanOutAction
}
}

private suspend fun runForEachDocTriggerIgnoringFindingsAndAlerts(
/**
* run doc-level triggers ignoring findings and alerts and generating a single alert.
*/
private suspend fun runForEachDocTriggerWithoutPersistFindingsAndAlerts(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
Expand All @@ -374,9 +381,14 @@ class TransportDocLevelMonitorFanOutAction
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val findingIds = if (workflowRunContext?.matchingDocIdsPerIndex?.second != null) {
workflowRunContext.matchingDocIdsPerIndex.second
} else {
listOf()
}
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
listOf(),
findingIds,
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
Expand Down Expand Up @@ -570,7 +582,7 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding and (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false)) {
if (shouldCreateFinding and (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false)) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -582,7 +594,7 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
Expand Down Expand Up @@ -945,11 +957,11 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (monitor.ignoreFindingsAndAlerts == null || monitor.ignoreFindingsAndAlerts == false) {
if (monitor.shouldPersistFindingsAndAlerts == null || monitor.shouldPersistFindingsAndAlerts == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.ignoreFindingsAndAlerts == true) {
} else if (monitor.shouldPersistFindingsAndAlerts == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
var lastErrorDelegateRun: Exception? = null

for (delegate in delegates) {
var indexToDocIds = mapOf<String, List<String>>()
var indexToDocIdsWithFindings: Pair<Map<String, List<String>>, List<String>>? = Pair(mapOf(), listOf())
var delegateMonitor: Monitor
delegateMonitor = monitorsById[delegate.monitorId]
?: throw AlertingException.wrap(
Expand All @@ -118,7 +118,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}

try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
indexToDocIdsWithFindings = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitors, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow due to failure in chained findings. Error: ${e.message}", e)
return WorkflowRunResult(
Expand All @@ -131,7 +131,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
workflowId = workflowMetadata.workflowId,
workflowMetadataId = workflowMetadata.id,
chainedMonitorId = delegate.chainedMonitorFindings?.monitorId,
matchingDocIdsPerIndex = indexToDocIds,
matchingDocIdsPerIndex = indexToDocIdsWithFindings!!,
auditDelegateMonitorAlerts = if (workflow.auditDelegateMonitorAlerts == null) true
else workflow.auditDelegateMonitorAlerts!!
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
Map.of(),
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest(
Expand Down Expand Up @@ -156,6 +157,7 @@ public void onFailure(Exception e) {
Map.of(),
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest(
Expand Down Expand Up @@ -240,6 +242,7 @@ public void onFailure(Exception e) {
Map.of(),
new DataSources(),
false,
false,
"sample-remote-monitor-plugin"
);
IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest(
Expand Down
Loading