Skip to content

Commit

Permalink
Added fix when executing the workflow and when chained findings index… (
Browse files Browse the repository at this point in the history
#890)

Signed-off-by: Stevan Buzejic <[email protected]>
  • Loading branch information
stevanbz authored and eirsep committed May 25, 2023
1 parent f9b97df commit 4e9f860
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.workflow

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.alerting.BucketLevelMonitorRunner
import org.opensearch.alerting.DocumentLevelMonitorRunner
import org.opensearch.alerting.MonitorRunnerExecutionContext
Expand Down Expand Up @@ -83,8 +84,14 @@ object CompositeWorkflowRunner : WorkflowRunner() {
try {
indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId)
} catch (e: Exception) {
logger.error("Failed to execute workflow. Error: ${e.message}", e)
return workflowResult.copy(error = AlertingException.wrap(e))
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
// If it is not IndexNotFound exception return the result
if (unwrappedException.message?.contains("Configured indices are not found") == false) {
logger.error("Failed to execute workflow. Error: ${e.message}", e)
return workflowResult.copy(error = AlertingException.wrap(e))
}
// Log that finding index is not found and proceed with the execution
logger.error("Finding index ${chainedMonitor.dataSources.findingsIndex} doesn't exist")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {
// First execution
val workflowId = workflowResponse.id
val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, true)

assertNotNull("Workflow run result is null", executeWorkflowResponse)
val monitorsRunResults = executeWorkflowResponse!!.workflowRunResult.workflowRunResult
assertEquals(2, monitorsRunResults.size)
Expand Down Expand Up @@ -707,4 +708,91 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() {

assertEquals(alertSize, acknowledgeAlertResponse.acknowledged.size)
}

fun `test execute workflow with bucket-level and doc-level chained monitors`() {
createTestIndex(TEST_HR_INDEX)

val compositeSources = listOf(
TermsValuesSourceBuilder("test_field").field("test_field")
)
val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources)
val input = SearchInput(
indices = listOf(TEST_HR_INDEX),
query = SearchSourceBuilder().size(0).query(QueryBuilders.matchAllQuery()).aggregation(compositeAgg)
)
val triggerScript = """
params.docCount > 0
""".trimIndent()

var trigger = randomBucketLevelTrigger()
trigger = trigger.copy(
bucketSelector = BucketSelectorExtAggregationBuilder(
name = trigger.id,
bucketsPathsMap = mapOf("docCount" to "_count"),
script = Script(triggerScript),
parentBucketPath = "composite_agg",
filter = null
),
actions = listOf()
)
val bucketMonitor = createMonitor(
randomBucketLevelMonitor(
inputs = listOf(input),
enabled = false,
triggers = listOf(trigger)
)
)
assertNotNull("The bucket monitor was not created", bucketMonitor)

val docQuery1 = DocLevelQuery(query = "test_field:\"a\"", name = "3")
var monitor1 = randomDocumentLevelMonitor(
inputs = listOf(DocLevelMonitorInput("description", listOf(TEST_HR_INDEX), listOf(docQuery1))),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN))
)
val docMonitor = createMonitor(monitor1)!!
assertNotNull("The doc level monitor was not created", docMonitor)

val workflow = randomWorkflow(monitorIds = listOf(bucketMonitor!!.id, docMonitor.id))
val workflowResponse = upsertWorkflow(workflow)
assertNotNull("The workflow was not created", workflowResponse)

// Add a doc that is accessible to the user
indexDoc(
TEST_HR_INDEX,
"1",
"""
{
"test_field": "a",
"accessible": true
}
""".trimIndent()
)

// Add a second doc that is not accessible to the user
indexDoc(
TEST_HR_INDEX,
"2",
"""
{
"test_field": "b",
"accessible": false
}
""".trimIndent()
)

indexDoc(
TEST_HR_INDEX,
"3",
"""
{
"test_field": "c",
"accessible": true
}
""".trimIndent()
)

val executeResult = executeWorkflow(id = workflowResponse!!.id)
assertNotNull(executeResult)
assertEquals(2, executeResult!!.workflowRunResult.workflowRunResult.size)
}
}

0 comments on commit 4e9f860

Please sign in to comment.