Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize execution of workflow consisting of bucket-level followed by doc-level monitors #1729

Merged
merged 6 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/auto-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Get tag
id: tag
uses: dawidd6/action-get-tag@v1
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- uses: ncipollo/release-action@v1
with:
github_token: ${{ steps.github_app_token.outputs.token }}
Expand Down
6 changes: 4 additions & 2 deletions .github/workflows/bwc-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ jobs:
# this image tag is subject to change as more dependencies and updates will arrive over time
image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }}
# need to switch to root so that github actions can install runner binary on container without permission issues.
options: --user root
options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }}

steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
uses: actions/checkout@v4
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/maven-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
with:
distribution: temurin # Temurin is a distribution of adoptium
java-version: ${{ matrix.jdk }}
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: aws-actions/configure-aws-credentials@v1
with:
role-to-assume: ${{ secrets.PUBLISH_SNAPSHOTS_ROLE }}
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/multi-node-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ on:
push:
branches:
- "*"
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Get-CI-Image-Tag:
Expand All @@ -30,17 +28,19 @@ jobs:
# this image tag is subject to change as more dependencies and updates will arrive over time
image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }}
# need to switch to root so that github actions can install runner binary on container without permission issues.
options: --user root
ooptions: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }}

steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
with:
java-version: ${{ matrix.java }}
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
uses: actions/checkout@v4
- name: Run integration tests with multi node config
run: |
chown -R 1000:1000 `pwd`
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
build:
strategy:
matrix:
java: [ 21, 23 ]
java: [ 21 ]
# Job name
name: Build and test Alerting
# This job runs on Linux
Expand All @@ -25,7 +25,7 @@ jobs:
java-version: ${{ matrix.java }}
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
uses: actions/checkout@v4
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
Expand Down
18 changes: 10 additions & 8 deletions .github/workflows/test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ on:
push:
branches:
- "*"
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
Get-CI-Image-Tag:
Expand All @@ -33,12 +31,14 @@ jobs:
# this image tag is subject to change as more dependencies and updates will arrive over time
image: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-version-linux }}
# need to switch to root so that github actions can install runner binary on container without permission issues.
options: --user root
options: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-options }}

steps:
- name: Run start commands
run: ${{ needs.Get-CI-Image-Tag.outputs.ci-image-start-command }}
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
uses: actions/checkout@v4
# This step uses the setup-java Github action: https://github.com/actions/setup-java
- name: Set Up JDK ${{ matrix.java }}
uses: actions/setup-java@v1
Expand All @@ -54,15 +54,16 @@ jobs:
cp ./alerting/build/distributions/*.zip alerting-artifacts
# This step uses the codecov-action Github action: https://github.com/codecov/codecov-action
- name: Upload Coverage Report
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
# This step uses the upload-artifact Github action: https://github.com/actions/upload-artifact
- name: Upload Artifacts
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: alerting-plugin-${{ matrix.os }}
path: alerting-artifacts
overwrite: true

build:
needs: Get-CI-Image-Tag
Expand All @@ -85,7 +86,7 @@ jobs:
steps:
# This step uses the checkout Github action: https://github.com/actions/checkout
- name: Checkout Branch
uses: actions/checkout@v2
uses: actions/checkout@v4
# This is a hack, but this step creates a link to the X: mounted drive, which makes the path
# short enough to work on Windows
- name: Shorten Path
Expand All @@ -107,7 +108,8 @@ jobs:
cp ./alerting/build/distributions/*.zip alerting-artifacts
# This step uses the upload-artifact Github action: https://github.com/actions/upload-artifact
- name: Upload Artifacts
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: alerting-plugin-${{ matrix.os }}
path: alerting-artifacts
overwrite: true
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.sort.SortOrder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID
Expand Down Expand Up @@ -479,7 +480,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val queryBuilder = if (input.query.query() == null) BoolQueryBuilder()
else QueryBuilders.boolQuery().must(source.query())
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
sr.source().query(queryBuilder).sort("_seq_no", SortOrder.DESC)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we sorting based on _seq_no?

there is already a range query based on period_end variable

this seems incorrect

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to sort this because without sort, we get random 10 docs in period of last 15 minutes by default for a workflow running every 1 min. Now, the aggregation may have grouped 1000 docs.
So, 10 out of 1000 docs generated may not be the latest ones. We pass these 10 docs to the delegated doc-level monitor which has already moved its seq_no past these 10 random docs and hence do not generate an alert.

sort by seq_no ensures we always get the latest 10 docs out of the 1000 docs considered for aggregation. Thus, when the doc-level monitor runs next time, it gets latest 10 docs and it goes on to geenrate an alert.

}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex
val findingIdsForMatchingDocIds = if (workflowRunContext?.findingIds != null) {
workflowRunContext.findingIds
} else {
listOf()
}

val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
Expand Down Expand Up @@ -226,6 +231,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndices,
conflictingFields.toList(),
matchingDocIdsPerIndex?.get(concreteIndexName),
findingIdsForMatchingDocIds
)

val shards = mutableSetOf<String>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class WorkflowService(
* @param chainedMonitors Monitors that have previously executed
* @param workflowExecutionId Execution id of the current workflow
*/
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String): Map<String, List<String>> {
suspend fun getFindingDocIdsByExecutionId(chainedMonitors: List<Monitor>, workflowExecutionId: String):
Pair<Map<String, List<String>>, List<String>> {
if (chainedMonitors.isEmpty())
return emptyMap()
return Pair(emptyMap(), listOf())
val dataSources = chainedMonitors[0].dataSources
try {
val existsResponse: IndicesExistsResponse = client.admin().indices().suspendUntil {
exists(IndicesExistsRequest(dataSources.findingsIndex).local(true), it)
}
if (existsResponse.isExists == false) return emptyMap()
if (existsResponse.isExists == false) return Pair(emptyMap(), listOf())
// Search findings index to match id of monitors and workflow execution id
val bqb = QueryBuilders.boolQuery()
.filter(
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkflowService(
for (finding in findings) {
indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds)
}
return indexToRelatedDocIdsMap
return Pair(indexToRelatedDocIdsMap, findings.map { it.id })
} catch (t: Exception) {
log.error("Error getting finding doc ids: ${t.message}", t)
throw AlertingException.wrap(t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,19 +297,37 @@ class TransportDocLevelMonitorFanOutAction
createFindings(monitor, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
/**
* if should_persist_findings_and_alerts flag is not set, doc-level trigger generates alerts else doc-level trigger
* generates a single alert with multiple findings.
*/
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTrigger(
monitorResult,
it as DocumentLevelTrigger,
monitor,
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun,
executionId = executionId,
findingIdToDocSource,
workflowRunContext = workflowRunContext
)
}
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
monitor.triggers.forEach {
triggerResults[it.id] = runForEachDocTriggerCreateSingleGroupedAlert(
monitorResult,
it as DocumentLevelTrigger,
monitor,
queryToDocIds,
dryrun,
executionId,
workflowRunContext
)
}
}
}

Expand Down Expand Up @@ -349,6 +367,58 @@ class TransportDocLevelMonitorFanOutAction
}
}

/**
* run doc-level triggers ignoring findings and alerts and generating a single alert.
*/
private suspend fun runForEachDocTriggerCreateSingleGroupedAlert(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
monitor: Monitor,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
executionId: String,
workflowRunContext: WorkflowRunContext?
): DocumentLevelTriggerRunResult {
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, queryToDocIds)
if (triggerResult.triggeredDocs.isNotEmpty()) {
val findingIds = if (workflowRunContext?.findingIds != null) {
workflowRunContext.findingIds
} else {
listOf()
}
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val alert = alertService.composeDocLevelAlert(
findingIds!!,
triggerResult.triggeredDocs,
triggerCtx,
monitorResult.alertError() ?: triggerResult.alertError(),
executionId = executionId,
workflorwRunContext = workflowRunContext
)
for (action in trigger.actions) {
this.runAction(action, triggerCtx.copy(alerts = listOf(AlertContext(alert))), monitor, dryrun)
}

if (!dryrun && monitor.id != Monitor.NO_ID) {
val actionResults = triggerResult.actionResultsMap.getOrDefault(alert.id, emptyMap())
val actionExecutionResults = actionResults.values.map { actionRunResult ->
ActionExecutionResult(actionRunResult.actionId, actionRunResult.executionTime, if (actionRunResult.throttled) 1 else 0)
}
val updatedAlert = alert.copy(actionExecutionResults = actionExecutionResults)

retryPolicy.let {
alertService.saveAlerts(
monitor.dataSources,
listOf(updatedAlert),
it,
routingId = monitor.id
)
}
}
}
return DocumentLevelTriggerRunResult(trigger.name, listOf(), monitorResult.error)
}

private suspend fun runForEachDocTrigger(
monitorResult: MonitorRunResult<DocumentLevelTriggerRunResult>,
trigger: DocumentLevelTrigger,
Expand Down Expand Up @@ -512,7 +582,11 @@ class TransportDocLevelMonitorFanOutAction
.string()
log.debug("Findings: $findingStr")

if (shouldCreateFinding) {
if (shouldCreateFinding and (
monitor.shouldCreateSingleAlertForFindings == null ||
monitor.shouldCreateSingleAlertForFindings == false
)
) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
Expand All @@ -524,13 +598,15 @@ class TransportDocLevelMonitorFanOutAction
bulkIndexFindings(monitor, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, finding)
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
try {
findings.forEach { finding ->
publishFinding(monitor, finding)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
} catch (e: Exception) {
// suppress exception
log.error("Optional finding callback failed", e)
}
this.findingsToTriggeredQueries += findingsToTriggeredQueries

Expand Down Expand Up @@ -688,6 +764,7 @@ class TransportDocLevelMonitorFanOutAction
var to: Long = Long.MAX_VALUE
while (to >= from) {
val hits: SearchHits = searchShard(
monitor,
indexExecutionCtx.concreteIndexName,
shard,
from,
Expand Down Expand Up @@ -870,6 +947,7 @@ class TransportDocLevelMonitorFanOutAction
* This method hence fetches only docs from shard which haven't been queried before
*/
private suspend fun searchShard(
monitor: Monitor,
index: String,
shard: String,
prevSeqNo: Long?,
Expand All @@ -883,8 +961,16 @@ class TransportDocLevelMonitorFanOutAction
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
if (monitor.shouldCreateSingleAlertForFindings == null || monitor.shouldCreateSingleAlertForFindings == false) {
if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}
} else if (monitor.shouldCreateSingleAlertForFindings == true) {
val docIdsParam = mutableListOf<String>()
if (docIds != null) {
docIdsParam.addAll(docIds)
}
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIdsParam))
}

val request: SearchRequest = SearchRequest()
Expand Down
Loading
Loading