Skip to content

Commit

Permalink
fix workflow execution for first run (#1220)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored Oct 4, 2023
1 parent 2197d46 commit f5fc62c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.MonitorRunnerService.monitorCtx
import org.opensearch.alerting.WorkflowMetadataService
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.addFilter
Expand All @@ -43,6 +46,7 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.util.isQueryLevelMonitor
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.CompositeWorkflowRunner
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
Expand Down Expand Up @@ -372,6 +376,37 @@ class TransportIndexWorkflowAction @Inject constructor(
)
return
}

val createdWorkflow = request.workflow.copy(id = indexResponse.id)
val executionId = CompositeWorkflowRunner.generateExecutionId(false, createdWorkflow)

val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = createdWorkflow,
skipIndex = false,
executionId = executionId
)

val delegates = (createdWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
var (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
)

if (created == false) {
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
MonitorMetadataService.upsertMetadata(monitorMetadata, updating = true)
}
actionListener.onResponse(
IndexWorkflowResponse(
indexResponse.id, indexResponse.version, indexResponse.seqNo,
Expand Down Expand Up @@ -499,6 +534,33 @@ class TransportIndexWorkflowAction @Inject constructor(
)
return
}

val updatedWorkflow = request.workflow.copy(id = indexResponse.id)
val executionId = CompositeWorkflowRunner.generateExecutionId(false, updatedWorkflow)

val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata(
workflow = updatedWorkflow,
skipIndex = false,
executionId = executionId
)

val delegates = (updatedWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order }
val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size)

for (monitor in monitors) {
val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = true,
workflowMetadataId = workflowMetadata.id
)

if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor)
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true)
}
}
actionListener.onResponse(
IndexWorkflowResponse(
indexResponse.id, indexResponse.version, indexResponse.seqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
}
}

private fun generateExecutionId(
fun generateExecutionId(
isTempWorkflow: Boolean,
workflow: Workflow,
): String {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3150,8 +3150,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
searchWorkflowMetadata(id = workflowId)
} catch (ex: java.lang.Exception) {
exception = ex
assertTrue(exception is java.util.NoSuchElementException)
}
assertTrue(exception is java.util.NoSuchElementException)
}

fun `test execute workflow with custom alerts and finding index with bucket and doc monitor bucket monitor used as chained finding`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,49 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
val acknowledged = acknowledgeChainedAlertsResponse["success"] as List<String>
Assert.assertEquals(acknowledged[0], alerts1[0]["id"])
}

fun `test run workflow as scheduled job success`() {
val index = createTestIndex()
val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput(
"description", listOf(index), listOf(docQuery1)
)
val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)

val monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = false
)
val monitorResponse = createMonitor(monitor)

val workflow = randomWorkflow(
monitorIds = listOf(monitorResponse.id),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)

val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity())

assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus())

val responseBody = createResponse.asMap()
val createdId = responseBody["_id"] as String
val createdVersion = responseBody["_version"] as Int

assertNotEquals("response is missing Id", Workflow.NO_ID, createdId)
assertTrue("incorrect version", createdVersion > 0)
assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location"))

val testDoc = """{
"message" : "This is an error from IAD region",
"test_field" : "us-west-2"
}"""

indexDoc(index, "1", testDoc)
Thread.sleep(80000)

val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
}
}

0 comments on commit f5fc62c

Please sign in to comment.