From 6a68bda57a6881e4b148bbf85cfb64b4efca45f6 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 3 Oct 2023 21:29:09 +0000 Subject: [PATCH] fix workflow execution for first run Signed-off-by: Subhobrata Dey --- .../transport/TransportIndexMonitorAction.kt | 38 ++++----- .../transport/TransportIndexWorkflowAction.kt | 83 +++++++++++++++++++ .../alerting/util/DocLevelMonitorQueries.kt | 23 +++++ .../workflow/CompositeWorkflowRunner.kt | 2 +- .../alerting/MonitorDataSourcesIT.kt | 2 +- .../resthandler/SecureWorkflowRestApiIT.kt | 4 +- .../alerting/resthandler/WorkflowRestApiIT.kt | 45 ++++++++++ 7 files changed, 169 insertions(+), 28 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 8669c038d..8e227f8fe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -511,7 +511,13 @@ class TransportIndexMonitorAction @Inject constructor( } try { if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { - indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) + docLevelMonitorQueries.indexDocLevelMonitorQueries( + request.monitor, + indexResponse.id, + metadata, + request.refreshPolicy, + indexTimeout + ) } // When inserting queries in queryIndex we could update sourceToQueryIndexMapping MonitorMetadataService.upsertMetadata(metadata, updating = true) @@ -548,28 +554,6 @@ class TransportIndexMonitorAction @Inject constructor( } } - @Suppress("UNCHECKED_CAST") - private suspend fun indexDocLevelMonitorQueries( - monitor: Monitor, - monitorId: String, - monitorMetadata: MonitorMetadata, - refreshPolicy: RefreshPolicy - ) { - val queryIndex = monitor.dataSources.queryIndex - if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) { - docLevelMonitorQueries.initDocLevelQueryIndex(monitor.dataSources) - log.info("Central Percolation index $queryIndex created") - } - docLevelMonitorQueries.indexDocLevelQueries( - monitor, - monitorId, - monitorMetadata, - refreshPolicy, - indexTimeout - ) - log.debug("Queries inserted into Percolate index $queryIndex") - } - private suspend fun updateMonitor() { val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId) try { @@ -669,7 +653,13 @@ class TransportIndexMonitorAction @Inject constructor( .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) .execute(it) } - indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) + docLevelMonitorQueries.indexDocLevelMonitorQueries( + request.monitor, + currentMonitor.id, + updatedMetadata, + request.refreshPolicy, + indexTimeout + ) MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) } actionListener.onResponse( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index 5a5f22fb3..b1dab6fd8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -27,6 +27,9 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.MonitorMetadataService +import org.opensearch.alerting.MonitorRunnerService.monitorCtx +import org.opensearch.alerting.WorkflowMetadataService import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.addFilter @@ -39,10 +42,12 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_TH import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.util.isQueryLevelMonitor import org.opensearch.alerting.util.use +import org.opensearch.alerting.workflow.CompositeWorkflowRunner import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject @@ -71,6 +76,9 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task @@ -90,6 +98,7 @@ class TransportIndexWorkflowAction @Inject constructor( val settings: Settings, val xContentRegistry: NamedXContentRegistry, val namedWriteableRegistry: NamedWriteableRegistry, + val docLevelMonitorQueries: DocLevelMonitorQueries ) : HandledTransportAction( AlertingActions.INDEX_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::IndexWorkflowRequest ), @@ -372,6 +381,42 @@ class TransportIndexWorkflowAction @Inject constructor( ) return } + + val createdWorkflow = request.workflow.copy(id = indexResponse.id) + val executionId = CompositeWorkflowRunner.generateExecutionId(false, createdWorkflow) + + val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( + workflow = createdWorkflow, + skipIndex = false, + executionId = executionId + ) + + val delegates = (createdWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + + for (monitor in monitors) { + val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata( + monitor = monitor, + createWithRunContext = true, + workflowMetadataId = workflowMetadata.id + ) + + if (created == false) { + log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") + } + + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + docLevelMonitorQueries.indexDocLevelMonitorQueries( + monitor, + monitor.id, + monitorMetadata, + request.refreshPolicy, + indexTimeout + ) + } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(monitorMetadata, updating = true) + } actionListener.onResponse( IndexWorkflowResponse( indexResponse.id, indexResponse.version, indexResponse.seqNo, @@ -499,6 +544,44 @@ class TransportIndexWorkflowAction @Inject constructor( ) return } + + val updatedWorkflow = request.workflow.copy(id = indexResponse.id) + val executionId = CompositeWorkflowRunner.generateExecutionId(false, updatedWorkflow) + + val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( + workflow = updatedWorkflow, + skipIndex = false, + executionId = executionId + ) + + val delegates = (updatedWorkflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + val monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + + for (monitor in monitors) { + val (monitorMetadata, created) = MonitorMetadataService.getOrCreateMetadata( + monitor = monitor, + createWithRunContext = true, + workflowMetadataId = workflowMetadata.id + ) + + if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + val updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor) + client.suspendUntil { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(monitor.dataSources.queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) + .execute(it) + } + docLevelMonitorQueries.indexDocLevelMonitorQueries( + monitor, + monitor.id, + updatedMetadata, + request.refreshPolicy, + indexTimeout + ) + MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) + } + } actionListener.onResponse( IndexWorkflowResponse( indexResponse.id, indexResponse.version, indexResponse.seqNo, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index ef4b956b7..df2b58c84 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -590,4 +590,27 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ private fun getWriteIndexNameForAlias(alias: String): String? { return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name } + + @Suppress("UNCHECKED_CAST") + public suspend fun indexDocLevelMonitorQueries( + monitor: Monitor, + monitorId: String, + monitorMetadata: MonitorMetadata, + refreshPolicy: RefreshPolicy, + indexTimeout: TimeValue + ) { + val queryIndex = monitor.dataSources.queryIndex + if (!docLevelQueryIndexExists(monitor.dataSources)) { + initDocLevelQueryIndex(monitor.dataSources) + log.info("Central Percolation index $queryIndex created") + } + indexDocLevelQueries( + monitor, + monitorId, + monitorMetadata, + refreshPolicy, + indexTimeout + ) + log.debug("Queries inserted into Percolate index $queryIndex") + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 0dbf7eac2..cfed18c89 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -277,7 +277,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { } } - private fun generateExecutionId( + fun generateExecutionId( isTempWorkflow: Boolean, workflow: Workflow, ): String { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index e623dbb9a..2f21b2bc0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -3151,7 +3151,7 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } catch (ex: java.lang.Exception) { exception = ex } - assertTrue(exception is java.util.NoSuchElementException) + assertFalse(exception is java.util.NoSuchElementException) } fun `test execute workflow with custom alerts and finding index with bucket and doc monitor bucket monitor used as chained finding`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt index 606ff9bc5..31b152333 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/SecureWorkflowRestApiIT.kt @@ -9,10 +9,10 @@ import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.HttpHeaders import org.apache.hc.core5.http.io.entity.StringEntity import org.apache.hc.core5.http.message.BasicHeader +import org.apache.lucene.tests.util.LuceneTestCase.AwaitsFix import org.junit.After import org.junit.Before import org.junit.BeforeClass -import org.junit.Ignore import org.opensearch.alerting.ALERTING_BASE_URI import org.opensearch.alerting.ALERTING_DELETE_WORKFLOW_ACCESS import org.opensearch.alerting.ALERTING_EXECUTE_WORKFLOW_ACCESS @@ -63,7 +63,7 @@ import org.opensearch.test.junit.annotations.TestLogging import java.time.Instant // TODO investigate flaky nature of tests. not reproducible in local but fails in jenkins CI -@Ignore +@AwaitsFix(bugUrl = "") @TestLogging("level:DEBUG", reason = "Debug for tests.") @Suppress("UNCHECKED_CAST") class SecureWorkflowRestApiIT : AlertingRestTestCase() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 33ae93717..9cd2c5e26 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -1140,4 +1140,49 @@ class WorkflowRestApiIT : AlertingRestTestCase() { val acknowledged = acknowledgeChainedAlertsResponse["success"] as List Assert.assertEquals(acknowledged[0], alerts1[0]["id"]) } + + fun `test run workflow as scheduled job success`() { + val index = createTestIndex() + val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = false + ) + val monitorResponse = createMonitor(monitor) + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + + val createResponse = client().makeRequest("POST", WORKFLOW_ALERTING_BASE_URI, emptyMap(), workflow.toHttpEntity()) + + assertEquals("Create workflow failed", RestStatus.CREATED, createResponse.restStatus()) + + val responseBody = createResponse.asMap() + val createdId = responseBody["_id"] as String + val createdVersion = responseBody["_version"] as Int + + assertNotEquals("response is missing Id", Workflow.NO_ID, createdId) + assertTrue("incorrect version", createdVersion > 0) + assertEquals("Incorrect Location header", "$WORKFLOW_ALERTING_BASE_URI/$createdId", createResponse.getHeader("Location")) + + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_field" : "us-west-2" + }""" + + indexDoc(index, "1", testDoc) + Thread.sleep(80000) + + val findings = searchFindings(monitor.copy(id = monitorResponse.id)) + assertEquals("Findings saved for test monitor", 1, findings.size) + } }