diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index d2a9fea20..702f98d38 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -8,6 +8,7 @@ package org.opensearch.alerting import org.opensearch.action.ActionRequest import org.opensearch.action.ActionResponse import org.opensearch.alerting.action.ExecuteMonitorAction +import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetEmailAccountAction import org.opensearch.alerting.action.GetEmailGroupAction @@ -27,6 +28,7 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestDeleteMonitorAction import org.opensearch.alerting.resthandler.RestExecuteMonitorAction +import org.opensearch.alerting.resthandler.RestExecuteWorkflowAction import org.opensearch.alerting.resthandler.RestGetAlertsAction import org.opensearch.alerting.resthandler.RestGetDestinationsAction import org.opensearch.alerting.resthandler.RestGetEmailAccountAction @@ -46,6 +48,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction import org.opensearch.alerting.transport.TransportDeleteWorkflowAction import org.opensearch.alerting.transport.TransportExecuteMonitorAction +import org.opensearch.alerting.transport.TransportExecuteWorkflowAction import org.opensearch.alerting.transport.TransportGetAlertsAction import org.opensearch.alerting.transport.TransportGetDestinationsAction import org.opensearch.alerting.transport.TransportGetEmailAccountAction @@ -60,6 +63,7 @@ import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator +import org.opensearch.alerting.workflow.WorkflowRunnerService import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes @@ -146,6 +150,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R } lateinit var runner: MonitorRunnerService + lateinit var workflowRunner: WorkflowRunnerService lateinit var scheduler: JobScheduler lateinit var sweeper: JobSweeper lateinit var scheduledJobIndices: ScheduledJobIndices @@ -170,6 +175,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestIndexMonitorAction(), RestSearchMonitorAction(settings, clusterService), RestExecuteMonitorAction(), + RestExecuteWorkflowAction(), RestAcknowledgeAlertAction(), RestScheduledJobStatsHandler("_alerting"), RestSearchEmailAccountAction(), @@ -200,7 +206,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java), ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), - ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java) + ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), + ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java) ) } @@ -248,6 +255,22 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) .registerConsumers() .registerDestinationSettings() + workflowRunner = WorkflowRunnerService + .registerClusterService(clusterService) + .registerClient(client) + .registerNamedXContentRegistry(xContentRegistry) + .registerScriptService(scriptService) + .registerIndexNameExpressionResolver(indexNameExpressionResolver) + .registerSettings(settings) + .registerThreadPool(threadPool) + .registerAlertIndices(alertIndices) + .registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry)) + .registerTriggerService(TriggerService(scriptService)) + .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) + .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) + .registerWorkflowService(WorkflowService(client, xContentRegistry)) + .registerConsumers() + .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService) scheduler = JobScheduler(threadPool, runner) @@ -263,6 +286,13 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R settings ) + WorkflowMetadataService.initialize( + client, + clusterService, + xContentRegistry, + settings + ) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 0e4183b4e..93989c7aa 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -25,6 +25,7 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.util.getCombinedTriggerRunResult +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert @@ -59,7 +60,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext? ): MonitorRunResult { val roles = MonitorRunnerService.getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") @@ -118,7 +120,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { monitor, periodStart, periodEnd, - monitorResult.inputResults + monitorResult.inputResults, + workflowRunContext ) if (firstIteration) { firstPageOfInputResults = inputResults @@ -154,7 +157,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { monitorCtx, periodStart, periodEnd, - !dryrun && monitor.id != Monitor.NO_ID + !dryrun && monitor.id != Monitor.NO_ID, + workflowRunContext ) } else { emptyList() @@ -350,7 +354,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - shouldCreateFinding: Boolean + shouldCreateFinding: Boolean, + workflowRunContext: WorkflowRunContext? = null ): List { monitor.inputs.forEach { input -> if (input is SearchInput) { @@ -361,14 +366,14 @@ object BucketLevelMonitorRunner : MonitorRunner() { for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) { when (aggFactory) { is CompositeAggregationBuilder -> { - var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings + var groupByFields = 0 // if number of fields used to group by > 1 we won't calculate findings val sources = aggFactory.sources() for (source in sources) { - if (grouByFields > 0) { + if (groupByFields > 0) { logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}") return listOf() } - grouByFields++ + groupByFields++ fieldName = source.field() } } @@ -409,7 +414,7 @@ object BucketLevelMonitorRunner : MonitorRunner() { sr.source().query(queryBuilder) } val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) } - return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding) + return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, workflowRunContext?.executionId) } else { logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}") } @@ -422,7 +427,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { searchResponse: SearchResponse, monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - shouldCreateFinding: Boolean + shouldCreateFinding: Boolean, + workflowExecutionId: String? = null ): List { val docIdsByIndexName: MutableMap> = mutableMapOf() for (hit in searchResponse.hits.hits) { @@ -441,7 +447,8 @@ object BucketLevelMonitorRunner : MonitorRunner() { monitorName = monitor.name, index = it.key, timestamp = Instant.now(), - docLevelQueries = listOf() + docLevelQueries = listOf(), + executionId = workflowExecutionId ) val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index acd3ded15..bb1435f3a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -24,6 +24,7 @@ import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.ShardRouting @@ -62,7 +63,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext? ): MonitorRunResult { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID @@ -88,7 +90,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata( monitor = monitor, createWithRunContext = false, - skipIndex = isTempMonitor + skipIndex = isTempMonitor, + workflowRunContext?.workflowMetadataId ) val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput @@ -127,6 +130,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } + // Map of document ids per index when monitor is workflow delegate and has chained findings + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + indices.forEach { indexName -> // Prepare lastRunContext for each index val indexLastRunContext = lastRunContext.getOrPut(indexName) { @@ -161,7 +167,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Prepare DocumentExecutionContext for each index val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName) + val matchingDocs = getMatchingDocs( + monitor, + monitorCtx, + docExecutionContext, + indexName, + matchingDocIdsPerIndex?.get(indexName) + ) if (matchingDocs.isNotEmpty()) { val matchedQueriesForDocs = getMatchedQueries( @@ -214,7 +226,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { idQueryMap, docsToQueries, queryToDocIds, - dryrun + dryrun, + workflowRunContext?.executionId ) } @@ -238,7 +251,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { idQueryMap: Map, docsToQueries: Map>, queryToDocIds: Map>, - dryrun: Boolean + dryrun: Boolean, + workflowExecutionId: String? = null ): DocumentLevelTriggerRunResult { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) @@ -249,7 +263,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // TODO: Implement throttling for findings docsToQueries.forEach { val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, !dryrun && monitor.id != Monitor.NO_ID) + val findingId = createFindings( + monitor, + monitorCtx, + triggeredQueries, + it.key, + !dryrun && monitor.id != Monitor.NO_ID, + workflowExecutionId + ) findings.add(findingId) if (triggerResult.triggeredDocs.contains(it.key)) { @@ -329,7 +350,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, docLevelQueries: List, matchingDocId: String, - shouldCreateFinding: Boolean + shouldCreateFinding: Boolean, + workflowExecutionId: String? = null, ): String { // Before the "|" is the doc id and after the "|" is the index val docIndex = matchingDocId.split("|") @@ -341,7 +363,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorName = monitor.name, index = docIndex[1], docLevelQueries = docLevelQueries, - timestamp = Instant.now() + timestamp = Instant.now(), + executionId = workflowExecutionId ) val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() @@ -458,7 +481,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, - index: String + index: String, + docIds: List? = null ): List> { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int val matchingDocs = mutableListOf>() @@ -474,7 +498,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { shard, prevSeqNo, maxSeqNo, - null + null, + docIds ) if (hits.hits.isNotEmpty()) { @@ -493,7 +518,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String? + query: String?, + docIds: List? = null ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -505,6 +531,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) } + if (!docIds.isNullOrEmpty()) { + boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) + } + val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index e2b464064..5013b1eff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -16,6 +16,7 @@ import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.executeTransportAction import org.opensearch.alerting.util.toMap +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput @@ -26,6 +27,11 @@ import org.opensearch.commons.alerting.model.ClusterMetricsInput import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.index.query.BoolQueryBuilder +import org.opensearch.index.query.MatchQueryBuilder +import org.opensearch.index.query.QueryBuilder +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.ScriptType @@ -47,12 +53,16 @@ class InputService( monitor: Monitor, periodStart: Instant, periodEnd: Instant, - prevResult: InputRunResults? = null + prevResult: InputRunResults? = null, + workflowRunContext: WorkflowRunContext? = null ): InputRunResults { return try { val results = mutableListOf>() val aggTriggerAfterKey: MutableMap = mutableMapOf() + // If monitor execution is triggered from a workflow + val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + // TODO: If/when multiple input queries are supported for Bucket-Level Monitor execution, aggTriggerAfterKeys will // need to be updated to account for it monitor.inputs.forEach { input -> @@ -63,9 +73,17 @@ class InputService( "period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli() ) + // Deep copying query before passing it to rewriteQuery since otherwise, the monitor.input is modified directly // which causes a strange bug where the rewritten query persists on the Monitor across executions val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers) + + // Rewrite query to consider the doc ids per given index + if (chainedFindingExist(matchingDocIdsPerIndex) && rewrittenQuery.query() != null) { + val updatedSourceQuery = updateInputQueryWithFindingDocIds(rewrittenQuery.query(), matchingDocIdsPerIndex!!) + rewrittenQuery.query(updatedSourceQuery) + } + val searchSource = scriptService.compile( Script( ScriptType.INLINE, @@ -107,6 +125,35 @@ class InputService( } } + /** + * Extends the given query builder with query that filters the given indices with the given doc ids per index + * Used whenever we want to select the documents that were found in chained delegate execution of the current workflow run + * + * @param query Original bucket monitor query + * @param matchingDocIdsPerIndex Map of finding doc ids grouped by index + */ + private fun updateInputQueryWithFindingDocIds( + query: QueryBuilder, + matchingDocIdsPerIndex: Map>, + ): QueryBuilder { + val queryBuilder = QueryBuilders.boolQuery().must(query) + val shouldQuery = QueryBuilders.boolQuery() + + matchingDocIdsPerIndex.forEach { entry -> + shouldQuery + .should() + .add( + BoolQueryBuilder() + .must(MatchQueryBuilder("_index", entry.key)) + .must(TermsQueryBuilder("_id", entry.value)) + ) + } + return queryBuilder.must(shouldQuery) + } + + private fun chainedFindingExist(indexToDocIds: Map>?) = + !indexToDocIds.isNullOrEmpty() + private fun deepCopyQuery(query: SearchSourceBuilder): SearchSourceBuilder { val out = BytesStreamOutput() query.writeTo(out) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index c9e8992c2..024bcdc50 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -110,18 +110,24 @@ object MonitorMetadataService : } } + /** + * Document monitors are keeping the context of the last run. + * Since one monitor can be part of multiple workflows we need to be sure that execution of the current workflow + * doesn't interfere with the other workflows that are dependent on the given monitor + */ suspend fun getOrCreateMetadata( monitor: Monitor, createWithRunContext: Boolean = true, - skipIndex: Boolean = false + skipIndex: Boolean = false, + workflowMetadataId: String? = null ): Pair { try { val created = true - val metadata = getMetadata(monitor) + val metadata = getMetadata(monitor, workflowMetadataId) return if (metadata != null) { metadata to !created } else { - val newMetadata = createNewMetadata(monitor, createWithRunContext = createWithRunContext) + val newMetadata = createNewMetadata(monitor, createWithRunContext = createWithRunContext, workflowMetadataId) if (skipIndex) { newMetadata to created } else { @@ -133,9 +139,9 @@ object MonitorMetadataService : } } - suspend fun getMetadata(monitor: Monitor): MonitorMetadata? { + suspend fun getMetadata(monitor: Monitor, workflowMetadataId: String? = null): MonitorMetadata? { try { - val metadataId = MonitorMetadata.getId(monitor) + val metadataId = MonitorMetadata.getId(monitor, workflowMetadataId) val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, metadataId).routing(monitor.id) val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } @@ -168,19 +174,19 @@ object MonitorMetadataService : val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap>) } else null - if (runContext != null) { - return metadata.copy( + return if (runContext != null) { + metadata.copy( lastRunContext = runContext ) } else { - return metadata + metadata } } catch (e: Exception) { throw AlertingException.wrap(e) } } - private suspend fun createNewMetadata(monitor: Monitor, createWithRunContext: Boolean): MonitorMetadata { + private suspend fun createNewMetadata(monitor: Monitor, createWithRunContext: Boolean, workflowMetadataId: String? = null): MonitorMetadata { val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { (monitor.inputs[0] as DocLevelMonitorInput).indices[0] } else null @@ -189,7 +195,7 @@ object MonitorMetadataService : createFullRunContext(monitorIndex) } else emptyMap() return MonitorMetadata( - id = "${monitor.id}-metadata", + id = MonitorMetadata.getId(monitor, workflowMetadataId), seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitorId = monitor.id, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 8d9aca954..419962578 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -24,6 +24,7 @@ import org.opensearch.alerting.util.destinationmigration.publishLegacyNotificati import org.opensearch.alerting.util.destinationmigration.sendNotification import org.opensearch.alerting.util.isAllowed import org.opensearch.alerting.util.isTestAction +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.node.NodeClient import org.opensearch.common.Strings import org.opensearch.commons.alerting.model.Monitor @@ -39,7 +40,8 @@ abstract class MonitorRunner { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryRun: Boolean + dryRun: Boolean, + workflowRunContext: WorkflowRunContext? = null ): MonitorRunResult<*> suspend fun runAction( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 67fa29877..41a26bb79 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -35,6 +35,7 @@ data class MonitorRunnerExecutionContext( var triggerService: TriggerService? = null, var alertService: AlertService? = null, var docLevelMonitorQueries: DocLevelMonitorQueries? = null, + var workflowService: WorkflowService? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index ea5e173a0..884ad372d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -12,6 +12,7 @@ import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.util.isADMonitor +import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger @@ -25,7 +26,8 @@ object QueryLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, periodStart: Instant, periodEnd: Instant, - dryrun: Boolean + dryrun: Boolean, + workflowRunContext: WorkflowRunContext? ): MonitorRunResult { val roles = MonitorRunnerService.getRolesForMonitor(monitor) logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}") @@ -48,7 +50,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { if (!isADMonitor(monitor)) { withClosableContext(InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)) { monitorResult = monitorResult.copy( - inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd) + inputResults = monitorCtx.inputService!!.collectInputResults(monitor, periodStart, periodEnd, null, workflowRunContext) ) } } else { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt new file mode 100644 index 000000000..056830af7 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt @@ -0,0 +1,169 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.DocWriteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.model.WorkflowMetadata +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.rest.RestStatus +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.util.UUID + +object WorkflowMetadataService : + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("WorkflowMetadataService")) { + private val log = LogManager.getLogger(this::class.java) + + private lateinit var client: Client + private lateinit var xContentRegistry: NamedXContentRegistry + private lateinit var clusterService: ClusterService + private lateinit var settings: Settings + + @Volatile private lateinit var indexTimeout: TimeValue + + fun initialize( + client: Client, + clusterService: ClusterService, + xContentRegistry: NamedXContentRegistry, + settings: Settings + ) { + this.clusterService = clusterService + this.client = client + this.xContentRegistry = xContentRegistry + this.settings = settings + this.indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings) + this.clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.INDEX_TIMEOUT) { indexTimeout = it } + } + + @Suppress("ComplexMethod", "ReturnCount") + suspend fun upsertWorkflowMetadata(metadata: WorkflowMetadata, updating: Boolean): WorkflowMetadata { + try { + val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .id(metadata.id) + .routing(metadata.workflowId) + .timeout(indexTimeout) + + if (updating) { + indexRequest.id(metadata.id) + } else { + indexRequest.opType(DocWriteRequest.OpType.CREATE) + } + val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } + when (response.result) { + DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { + val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result" + log.error(failureReason) + throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason)) + } + DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { + log.debug("Successfully upserted WorkflowMetadata:${metadata.id} ") + } + } + return metadata + } catch (e: Exception) { + // If the update is set to false and id is set conflict exception will be thrown + if (e is OpenSearchException && e.status() == RestStatus.CONFLICT && !updating) { + log.debug("Metadata with ${metadata.id} for workflow ${metadata.workflowId} already exist. Instead of creating new, updating existing metadata will be performed") + return upsertWorkflowMetadata(metadata, true) + } + log.error("Error saving metadata", e) + throw AlertingException.wrap(e) + } + } + + suspend fun getOrCreateWorkflowMetadata( + workflow: Workflow, + skipIndex: Boolean = false, + executionId: String + ): Pair { + try { + val created = true + val metadata = getWorkflowMetadata(workflow) + return if (metadata != null) { + metadata to !created + } else { + val newMetadata = createNewWorkflowMetadata(workflow, executionId, skipIndex) + if (skipIndex) { + newMetadata to created + } else { + upsertWorkflowMetadata(newMetadata, updating = false) to created + } + } + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + } + + private suspend fun getWorkflowMetadata(workflow: Workflow): WorkflowMetadata? { + try { + val metadataId = WorkflowMetadata.getId(workflow.id) + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, metadataId).routing(workflow.id) + + val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) } + return if (getResponse.isExists) { + val xcp = XContentHelper.createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + WorkflowMetadata.parse(xcp) + } else { + null + } + } catch (e: Exception) { + if (e.message?.contains("no such index") == true) { + return null + } else { + throw AlertingException.wrap(e) + } + } + } + + private fun createNewWorkflowMetadata(workflow: Workflow, executionId: String, isTempWorkflow: Boolean): WorkflowMetadata { + val id = if (isTempWorkflow) "${LocalDateTime.now(ZoneOffset.UTC)}${UUID.randomUUID()}" else workflow.id + return WorkflowMetadata( + id = WorkflowMetadata.getId(id), + workflowId = workflow.id, + monitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds(), + latestRunTime = Instant.now(), + latestExecutionId = executionId + ) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt new file mode 100644 index 000000000..0fa93587f --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowService.kt @@ -0,0 +1,130 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchException +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.Finding +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.query.QueryBuilders +import org.opensearch.search.builder.SearchSourceBuilder + +private val log = LogManager.getLogger(WorkflowService::class.java) + +/** + * Contains util methods used in workflow execution + */ +class WorkflowService( + val client: Client, + val xContentRegistry: NamedXContentRegistry, +) { + /** + * Returns finding doc ids per index for the given workflow execution + * Used for pre-filtering the dataset in the case of creating a workflow with chained findings + * + * @param chainedMonitor Monitor that is previously executed + * @param workflowExecutionId Execution id of the current workflow + */ + suspend fun getFindingDocIdsByExecutionId(chainedMonitor: Monitor, workflowExecutionId: String): Map> { + try { + // Search findings index per monitor and workflow execution id + val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termQuery(Finding.MONITOR_ID_FIELD, chainedMonitor.id)) + .filter(QueryBuilders.termQuery(Finding.EXECUTION_ID_FIELD, workflowExecutionId)) + val searchRequest = SearchRequest() + .source( + SearchSourceBuilder() + .query(bqb) + .version(true) + .seqNoAndPrimaryTerm(true) + ) + .indices(chainedMonitor.dataSources.findingsIndex) + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + + // Get the findings docs + val findings = mutableListOf() + for (hit in searchResponse.hits) { + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val finding = Finding.parse(xcp) + findings.add(finding) + } + // Based on the findings get the document ids + val indexToRelatedDocIdsMap = mutableMapOf>() + for (finding in findings) { + indexToRelatedDocIdsMap.getOrPut(finding.index) { mutableListOf() }.addAll(finding.relatedDocIds) + } + return indexToRelatedDocIdsMap + } catch (t: Exception) { + log.error("Error getting finding doc ids: ${t.message}", t) + throw AlertingException.wrap(t) + } + } + + /** + * Returns the list of monitors for the given ids + * Used in workflow execution in order to figure out the monitor type + * + * @param monitors List of monitor ids + * @param size Expected number of monitors + */ + suspend fun getMonitorsById(monitors: List, size: Int): List { + try { + val bqb = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("_id", monitors)) + + val searchRequest = SearchRequest() + .source( + SearchSourceBuilder() + .query(bqb) + .version(true) + .seqNoAndPrimaryTerm(true) + .size(size) + ) + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + + val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + return parseMonitors(searchResponse) + } catch (e: Exception) { + log.error("Error getting monitors: ${e.message}", e) + throw AlertingException.wrap(e) + } + } + + private fun parseMonitors(response: SearchResponse): List { + if (response.isTimedOut) { + log.error("Request for getting monitors timeout") + throw OpenSearchException("Cannot determine that the ${ScheduledJob.SCHEDULED_JOBS_INDEX} index is healthy") + } + val monitors = mutableListOf() + try { + for (hit in response.hits) { + XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.sourceAsString + ).use { hitsParser -> + val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor + monitors.add(monitor) + } + } + } catch (e: Exception) { + log.error("Error parsing monitors: ${e.message}", e) + throw AlertingException.wrap(e) + } + return monitors + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowAction.kt new file mode 100644 index 000000000..efed1087d --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionType + +class ExecuteWorkflowAction private constructor() : ActionType(NAME, ::ExecuteWorkflowResponse) { + companion object { + val INSTANCE = ExecuteWorkflowAction() + const val NAME = "cluster:admin/opensearch/alerting/workflow/execute" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowRequest.kt new file mode 100644 index 000000000..2d97bbdcc --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowRequest.kt @@ -0,0 +1,70 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.model.Workflow +import java.io.IOException + +/** + * A class containing workflow details. + */ +class ExecuteWorkflowRequest : ActionRequest { + val dryrun: Boolean + val requestEnd: TimeValue + val workflowId: String? + val workflow: Workflow? + + constructor( + dryrun: Boolean, + requestEnd: TimeValue, + workflowId: String?, + workflow: Workflow?, + ) : super() { + this.dryrun = dryrun + this.requestEnd = requestEnd + this.workflowId = workflowId + this.workflow = workflow + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readBoolean(), + sin.readTimeValue(), + sin.readOptionalString(), + if (sin.readBoolean()) { + Workflow.readFrom(sin) + } else null + ) + + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (workflowId == null && workflow == null) { + validationException = ValidateActions.addValidationError( + "Both workflow and workflow id are missing", validationException + ) + } + return validationException + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeBoolean(dryrun) + out.writeTimeValue(requestEnd) + out.writeOptionalString(workflowId) + if (workflow != null) { + out.writeBoolean(true) + workflow.writeTo(out) + } else { + out.writeBoolean(false) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowResponse.kt new file mode 100644 index 000000000..300f15f75 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/ExecuteWorkflowResponse.kt @@ -0,0 +1,39 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionResponse +import org.opensearch.alerting.model.WorkflowRunResult +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class ExecuteWorkflowResponse : ActionResponse, ToXContentObject { + val workflowRunResult: WorkflowRunResult + constructor( + workflowRunResult: WorkflowRunResult + ) : super() { + this.workflowRunResult = workflowRunResult + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + WorkflowRunResult(sin) + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + workflowRunResult.writeTo(out) + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return workflowRunResult.toXContent(builder, params) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt index 30a7f742c..3acc2e318 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt @@ -120,8 +120,9 @@ data class MonitorMetadata( return MonitorMetadata(sin) } - fun getId(monitor: Monitor): String { - return monitor.id + "-metadata" + fun getId(monitor: Monitor, workflowId: String? = null): String { + return if (workflowId.isNullOrEmpty()) "${monitor.id}-metadata" + else "${monitor.id}-$workflowId-metadata" } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt new file mode 100644 index 000000000..ba4cf1201 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowMetadata.kt @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.util.instant +import org.opensearch.commons.alerting.util.optionalTimeField +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import java.io.IOException +import java.time.Instant + +data class WorkflowMetadata( + val id: String, + val workflowId: String, + val monitorIds: List, + val latestRunTime: Instant, + val latestExecutionId: String +) : Writeable, ToXContent { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), + workflowId = sin.readString(), + monitorIds = sin.readStringList(), + latestRunTime = sin.readInstant(), + latestExecutionId = sin.readString() + ) + + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(workflowId) + out.writeStringCollection(monitorIds) + out.writeInstant(latestRunTime) + out.writeString(latestExecutionId) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + if (params.paramAsBoolean("with_type", false)) builder.startObject(METADATA) + builder.field(WORKFLOW_ID_FIELD, workflowId) + .field(MONITOR_IDS_FIELD, monitorIds) + .optionalTimeField(LATEST_RUN_TIME, latestRunTime) + .field(LATEST_EXECUTION_ID, latestExecutionId) + if (params.paramAsBoolean("with_type", false)) builder.endObject() + return builder.endObject() + } + + companion object { + const val METADATA = "workflow_metadata" + const val WORKFLOW_ID_FIELD = "workflow_id" + const val MONITOR_IDS_FIELD = "monitor_ids" + const val LATEST_RUN_TIME = "latest_run_time" + const val LATEST_EXECUTION_ID = "latest_execution_id" + + @JvmStatic @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser): WorkflowMetadata { + lateinit var workflowId: String + var monitorIds = mutableListOf() + lateinit var latestRunTime: Instant + lateinit var latestExecutionId: String + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + WORKFLOW_ID_FIELD -> workflowId = xcp.text() + MONITOR_IDS_FIELD -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + monitorIds.add(xcp.text()) + } + } + LATEST_RUN_TIME -> latestRunTime = xcp.instant()!! + LATEST_EXECUTION_ID -> latestExecutionId = xcp.text() + } + } + return WorkflowMetadata( + id = "$workflowId-metadata", + workflowId = workflowId, + monitorIds = monitorIds, + latestRunTime = latestRunTime, + latestExecutionId = latestExecutionId + ) + } + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): WorkflowMetadata { + return WorkflowMetadata(sin) + } + + fun getId(workflowId: String? = null) = "$workflowId-metadata" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowRunResult.kt new file mode 100644 index 000000000..9ef7a3536 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/WorkflowRunResult.kt @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException +import java.lang.Exception +import java.time.Instant + +data class WorkflowRunResult( + val workflowRunResult: List> = mutableListOf(), + val executionStartTime: Instant, + val executionEndTime: Instant? = null, + val executionId: String, + val error: Exception? = null +) : Writeable, ToXContent { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + sin.readList> { s: StreamInput -> MonitorRunResult.readFrom(s) }, + sin.readInstant(), + sin.readInstant(), + sin.readString(), + sin.readException() + ) + + override fun writeTo(out: StreamOutput) { + out.writeList(workflowRunResult) + out.writeInstant(executionStartTime) + out.writeInstant(executionEndTime) + out.writeString(executionId) + out.writeException(error) + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject().startArray("workflow_run_result") + for (monitorResult in workflowRunResult) { + monitorResult.toXContent(builder, ToXContent.EMPTY_PARAMS) + } + builder.endArray().field("execution_start_time", executionStartTime) + .field("execution_end_time", executionEndTime) + .field("error", error?.message).endObject() + return builder + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteWorkflowAction.kt new file mode 100644 index 000000000..4787fdbdb --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteWorkflowAction.kt @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.action.ExecuteWorkflowAction +import org.opensearch.alerting.action.ExecuteWorkflowRequest +import org.opensearch.client.node.NodeClient +import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener +import java.time.Instant + +private val log = LogManager.getLogger(RestExecuteWorkflowAction::class.java) + +class RestExecuteWorkflowAction : BaseRestHandler() { + + override fun getName(): String = "execute_workflow_action" + + override fun routes(): List { + return listOf( + RestHandler.Route(RestRequest.Method.POST, "${AlertingPlugin.WORKFLOW_BASE_URI}/{workflowID}/_execute") + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} ${AlertingPlugin.WORKFLOW_BASE_URI}/_execute") + + return RestChannelConsumer { channel -> + val dryrun = request.paramAsBoolean("dryrun", false) + val requestEnd = request.paramAsTime("period_end", TimeValue(Instant.now().toEpochMilli())) + + if (request.hasParam("workflowID")) { + val workflowId = request.param("workflowID") + val execWorkflowRequest = ExecuteWorkflowRequest(dryrun, requestEnd, workflowId, null) + client.execute(ExecuteWorkflowAction.INSTANCE, execWorkflowRequest, RestToXContentListener(channel)) + } else { + val xcp = request.contentParser() + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val workflow = Workflow.parse(xcp, Workflow.NO_ID, Workflow.NO_VERSION) + val execWorkflowRequest = ExecuteWorkflowRequest(dryrun, requestEnd, null, workflow) + client.execute(ExecuteWorkflowAction.INSTANCE, execWorkflowRequest, RestToXContentListener(channel)) + } + } + } + + override fun responseParams(): Set { + return setOf("dryrun", "period_end", "workflowID") + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index fb7ec9664..0bc223cfa 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -152,9 +152,7 @@ class TransportDeleteWorkflowAction @Inject constructor( val deleteResponse = deleteWorkflow(workflow) if (deleteDelegateMonitors == true) { - if (user == null) { - deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) - } else { + if (user != null && filterByEnabled) { // Un-stash the context withClosableContext( InjectorContextElement( @@ -167,6 +165,8 @@ class TransportDeleteWorkflowAction @Inject constructor( ) { deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) } + } else { + deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) } } actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt new file mode 100644 index 000000000..bd90d569a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt @@ -0,0 +1,124 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.ActionListener +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.alerting.MonitorRunnerService +import org.opensearch.alerting.action.ExecuteWorkflowAction +import org.opensearch.alerting.action.ExecuteWorkflowRequest +import org.opensearch.alerting.action.ExecuteWorkflowResponse +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.workflow.WorkflowRunnerService +import org.opensearch.client.Client +import org.opensearch.common.inject.Inject +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.authuser.User +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.time.Instant + +private val log = LogManager.getLogger(TransportExecuteWorkflowAction::class.java) + +class TransportExecuteWorkflowAction @Inject constructor( + transportService: TransportService, + private val client: Client, + private val runner: MonitorRunnerService, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry +) : HandledTransportAction( + ExecuteWorkflowAction.NAME, transportService, actionFilters, ::ExecuteWorkflowRequest +) { + override fun doExecute(task: Task, execWorkflowRequest: ExecuteWorkflowRequest, actionListener: ActionListener) { + val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + log.debug("User and roles string from thread context: $userStr") + val user: User? = User.parse(userStr) + + client.threadPool().threadContext.stashContext().use { + val executeWorkflow = fun(workflow: Workflow) { + runner.launch { + val (periodStart, periodEnd) = + workflow.schedule.getPeriodEndingAt(Instant.ofEpochMilli(execWorkflowRequest.requestEnd.millis)) + try { + val workflowRunResult = + WorkflowRunnerService.runJob(workflow, periodStart, periodEnd, execWorkflowRequest.dryrun) + withContext(Dispatchers.IO) { + actionListener.onResponse( + ExecuteWorkflowResponse( + workflowRunResult + ) + ) + } + } catch (e: Exception) { + log.error("Unexpected error running workflow", e) + withContext(Dispatchers.IO) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + } + } + + if (execWorkflowRequest.workflowId != null) { + val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execWorkflowRequest.workflowId) + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + log.error("Can't find workflow with id: ${response.id}") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Can't find workflow with id: ${response.id}", + RestStatus.NOT_FOUND + ) + ) + ) + return + } + if (!response.isSourceEmpty) { + XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + response.sourceAsBytesRef, XContentType.JSON + ).use { xcp -> + val workflow = ScheduledJob.parse(xcp, response.id, response.version) as Workflow + executeWorkflow(workflow) + } + } + } + + override fun onFailure(t: Exception) { + log.error("Error getting workflow ${execWorkflowRequest.workflowId}", t) + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } else { + val workflow = when (user?.name.isNullOrEmpty()) { + true -> execWorkflowRequest.workflow as Workflow + false -> (execWorkflowRequest.workflow as Workflow).copy(user = user) + } + + executeWorkflow(workflow) + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index 5fdd4490a..a68a32ab8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -604,7 +604,7 @@ class TransportIndexWorkflowAction @Inject constructor( val searchSource = SearchSourceBuilder().query(query) val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) - if (user != null && !isAdmin(user) && filterByEnabled) { + if (user != null && filterByEnabled) { addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") } @@ -647,9 +647,7 @@ class TransportIndexWorkflowAction @Inject constructor( val indicesSearchRequest = SearchRequest().indices(*indices.toTypedArray()) .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) - if (user == null) { - checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) - } else { + if (user != null && filterByEnabled) { // Unstash the context and check if user with specified roles has indices access withClosableContext( InjectorContextElement( @@ -662,6 +660,8 @@ class TransportIndexWorkflowAction @Inject constructor( ) { checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) } + } else { + checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt new file mode 100644 index 000000000..006cd6e5b --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -0,0 +1,173 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.workflow + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.BucketLevelMonitorRunner +import org.opensearch.alerting.DocumentLevelMonitorRunner +import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.QueryLevelMonitorRunner +import org.opensearch.alerting.WorkflowMetadataService +import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.WorkflowRunResult +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.isDocLevelMonitor +import org.opensearch.alerting.util.isQueryLevelMonitor +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneOffset +import java.util.UUID + +object CompositeWorkflowRunner : WorkflowRunner() { + private val logger = LogManager.getLogger(javaClass) + + override suspend fun runWorkflow( + workflow: Workflow, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryRun: Boolean + ): WorkflowRunResult { + val workflowExecutionStartTime = Instant.now() + + val isTempWorkflow = dryRun || workflow.id == Workflow.NO_ID + + val executionId = generateExecutionId(isTempWorkflow, workflow) + + val (workflowMetadata, _) = WorkflowMetadataService.getOrCreateWorkflowMetadata( + workflow = workflow, + skipIndex = isTempWorkflow, + executionId = executionId + ) + + var workflowResult = WorkflowRunResult(mutableListOf(), workflowExecutionStartTime, null, executionId) + + logger.debug("Workflow ${workflow.id} in $executionId execution is running") + val delegates = (workflow.inputs[0] as CompositeInput).sequence.delegates.sortedBy { it.order } + var monitors: List + + try { + monitors = monitorCtx.workflowService!!.getMonitorsById(delegates.map { it.monitorId }, delegates.size) + } catch (e: Exception) { + logger.error("Failed getting workflow delegates. Error: ${e.message}", e) + return workflowResult.copy(error = AlertingException.wrap(e)) + } + // Validate the monitors size + validateMonitorSize(delegates, monitors, workflow) + + val monitorsById = monitors.associateBy { it.id } + val resultList = mutableListOf>() + var lastErrorDelegateRun: Exception? = null + + for (delegate in delegates) { + var indexToDocIds = mapOf>() + var delegateMonitor: Monitor + delegateMonitor = monitorsById[delegate.monitorId] + ?: throw AlertingException.wrap( + IllegalStateException("Delegate monitor not found ${delegate.monitorId} for the workflow $workflow.id") + ) + if (delegate.chainedMonitorFindings != null) { + val chainedMonitor = monitorsById[delegate.chainedMonitorFindings!!.monitorId] + ?: throw AlertingException.wrap( + IllegalStateException("Chained finding monitor not found ${delegate.monitorId} for the workflow $workflow.id") + ) + + try { + indexToDocIds = monitorCtx.workflowService!!.getFindingDocIdsByExecutionId(chainedMonitor, executionId) + } catch (e: Exception) { + logger.error("Failed to execute workflow. Error: ${e.message}", e) + return workflowResult.copy(error = AlertingException.wrap(e)) + } + } + + val workflowRunContext = WorkflowRunContext( + workflowId = workflowMetadata.workflowId, + workflowMetadataId = workflowMetadata.id, + chainedMonitorId = delegate.chainedMonitorFindings?.monitorId, + executionId = executionId, + matchingDocIdsPerIndex = indexToDocIds + ) + + var delegateRunResult: MonitorRunResult<*>? + try { + delegateRunResult = if (delegateMonitor.isBucketLevelMonitor()) { + BucketLevelMonitorRunner.runMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext + ) + } else if (delegateMonitor.isDocLevelMonitor()) { + DocumentLevelMonitorRunner.runMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext + ) + } else if (delegateMonitor.isQueryLevelMonitor()) { + QueryLevelMonitorRunner.runMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext + ) + } else { + throw AlertingException.wrap( + IllegalStateException("Unsupported monitor type") + ) + } + } catch (ex: Exception) { + logger.error("Error executing workflow delegate. Error: ${ex.message}", ex) + lastErrorDelegateRun = AlertingException.wrap(ex) + continue + } + if (delegateRunResult != null) resultList.add(delegateRunResult) + } + logger.debug("Workflow ${workflow.id} in $executionId finished") + // Update metadata only if the workflow is not temp + if (!isTempWorkflow) { + WorkflowMetadataService.upsertWorkflowMetadata( + workflowMetadata.copy(latestRunTime = workflowExecutionStartTime, latestExecutionId = executionId), + true + ) + } + + return workflowResult.copy(workflowRunResult = resultList, executionEndTime = Instant.now(), error = lastErrorDelegateRun) + } + + private fun generateExecutionId( + isTempWorkflow: Boolean, + workflow: Workflow, + ): String { + val randomPart = "${LocalDateTime.now(ZoneOffset.UTC)}${UUID.randomUUID()}" + return if (isTempWorkflow) randomPart else workflow.id.plus(randomPart) + } + + private fun validateMonitorSize( + delegates: List, + monitors: List, + workflow: Workflow, + ) { + if (delegates.size != monitors.size) { + val diffMonitorIds = delegates.map { it.monitorId }.minus(monitors.map { it.id }.toSet()).joinToString() + logger.error("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id") + throw AlertingException.wrap( + IllegalStateException("Delegate monitors don't exist $diffMonitorIds for the workflow $workflow.id") + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt new file mode 100644 index 000000000..60285e70b --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunContext.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.workflow + +data class WorkflowRunContext( + // In case of dry run it's random generated id, while in other cases it's workflowId + val workflowId: String, + val workflowMetadataId: String, + val chainedMonitorId: String?, + val executionId: String, + val matchingDocIdsPerIndex: Map> +) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt new file mode 100644 index 000000000..a7272a3dc --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunner.kt @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.workflow + +import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.model.WorkflowRunResult +import org.opensearch.commons.alerting.model.Workflow +import java.time.Instant + +abstract class WorkflowRunner { + abstract suspend fun runWorkflow( + workflow: Workflow, + monitorCtx: MonitorRunnerExecutionContext, + periodStart: Instant, + periodEnd: Instant, + dryRun: Boolean + ): WorkflowRunResult +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt new file mode 100644 index 000000000..5ea979119 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/WorkflowRunnerService.kt @@ -0,0 +1,252 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.workflow + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.alerting.AlertService +import org.opensearch.alerting.InputService +import org.opensearch.alerting.MonitorRunnerExecutionContext +import org.opensearch.alerting.TriggerService +import org.opensearch.alerting.WorkflowService +import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.JobRunner +import org.opensearch.alerting.model.WorkflowRunResult +import org.opensearch.alerting.model.destination.DestinationContextFactory +import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT +import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT +import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT +import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT +import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS +import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST +import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST +import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings +import org.opensearch.alerting.util.DocLevelMonitorQueries +import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.component.AbstractLifecycleComponent +import org.opensearch.common.settings.Settings +import org.opensearch.commons.alerting.model.Alert +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.action.Action +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.script.Script +import org.opensearch.script.ScriptService +import org.opensearch.script.TemplateScript +import org.opensearch.threadpool.ThreadPool +import java.time.Instant +import kotlin.coroutines.CoroutineContext + +object WorkflowRunnerService : JobRunner, CoroutineScope, AbstractLifecycleComponent() { + + private val logger = LogManager.getLogger(javaClass) + + var monitorCtx: MonitorRunnerExecutionContext = MonitorRunnerExecutionContext() + private lateinit var runnerSupervisor: Job + override val coroutineContext: CoroutineContext + get() = Dispatchers.Default + runnerSupervisor + + fun registerClusterService(clusterService: ClusterService): WorkflowRunnerService { + monitorCtx.clusterService = clusterService + return this + } + + fun registerClient(client: Client): WorkflowRunnerService { + monitorCtx.client = client + return this + } + + fun registerNamedXContentRegistry(xContentRegistry: NamedXContentRegistry): WorkflowRunnerService { + monitorCtx.xContentRegistry = xContentRegistry + return this + } + + fun registerScriptService(scriptService: ScriptService): WorkflowRunnerService { + monitorCtx.scriptService = scriptService + return this + } + + fun registerIndexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): WorkflowRunnerService { + monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver + return this + } + + fun registerSettings(settings: Settings): WorkflowRunnerService { + monitorCtx.settings = settings + return this + } + + fun registerThreadPool(threadPool: ThreadPool): WorkflowRunnerService { + monitorCtx.threadPool = threadPool + return this + } + + fun registerAlertIndices(alertIndices: AlertIndices): WorkflowRunnerService { + monitorCtx.alertIndices = alertIndices + return this + } + + fun registerInputService(inputService: InputService): WorkflowRunnerService { + monitorCtx.inputService = inputService + return this + } + + fun registerWorkflowService(workflowService: WorkflowService): WorkflowRunnerService { + monitorCtx.workflowService = workflowService + return this + } + + fun registerTriggerService(triggerService: TriggerService): WorkflowRunnerService { + monitorCtx.triggerService = triggerService + return this + } + + fun registerAlertService(alertService: AlertService): WorkflowRunnerService { + monitorCtx.alertService = alertService + return this + } + + fun registerDocLevelMonitorQueries(docLevelMonitorQueries: DocLevelMonitorQueries): WorkflowRunnerService { + monitorCtx.docLevelMonitorQueries = docLevelMonitorQueries + return this + } + + // Must be called after registerClusterService and registerSettings in AlertingPlugin + fun registerConsumers(): WorkflowRunnerService { + monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( + ALERT_BACKOFF_MILLIS.get(monitorCtx.settings), + ALERT_BACKOFF_COUNT.get(monitorCtx.settings) + ) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count -> + monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count) + } + + monitorCtx.moveAlertsRetryPolicy = + BackoffPolicy.exponentialBackoff( + MOVE_ALERTS_BACKOFF_MILLIS.get(monitorCtx.settings), + MOVE_ALERTS_BACKOFF_COUNT.get(monitorCtx.settings) + ) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(MOVE_ALERTS_BACKOFF_MILLIS, MOVE_ALERTS_BACKOFF_COUNT) { + millis, count -> + monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) + } + + monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) { + monitorCtx.allowList = it + } + + // Host deny list is not a dynamic setting so no consumer is registered but the variable is set here + monitorCtx.hostDenyList = HOST_DENY_LIST.get(monitorCtx.settings) + + monitorCtx.maxActionableAlertCount = MAX_ACTIONABLE_ALERT_COUNT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(MAX_ACTIONABLE_ALERT_COUNT) { + monitorCtx.maxActionableAlertCount = it + } + + monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + + return this + } + + // To be safe, call this last as it depends on a number of other components being registered beforehand (client, settings, etc.) + fun registerDestinationSettings(): WorkflowRunnerService { + monitorCtx.destinationSettings = loadDestinationSettings(monitorCtx.settings!!) + monitorCtx.destinationContextFactory = + DestinationContextFactory(monitorCtx.client!!, monitorCtx.xContentRegistry!!, monitorCtx.destinationSettings!!) + return this + } + + // Updates destination settings when the reload API is called so that new keystore values are visible + fun reloadDestinationSettings(settings: Settings) { + monitorCtx.destinationSettings = loadDestinationSettings(settings) + + // Update destinationContextFactory as well since destinationSettings has been updated + monitorCtx.destinationContextFactory!!.updateDestinationSettings(monitorCtx.destinationSettings!!) + } + + override fun doStart() { + runnerSupervisor = SupervisorJob() + } + + override fun doStop() { + runnerSupervisor.cancel() + } + + override fun doClose() { } + + override fun postIndex(job: ScheduledJob) { + } + + override fun postDelete(jobId: String) { + } + + override fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant) { + if (job !is Workflow) { + throw IllegalArgumentException("Invalid job type") + } + launch { + runJob(job, periodStart, periodEnd, false) + } + } + + suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): WorkflowRunResult { + val workflow = job as Workflow + return CompositeWorkflowRunner.runWorkflow(workflow, monitorCtx, periodStart, periodEnd, dryrun) + } + + // TODO: See if we can move below methods (or few of these) to a common utils + internal fun getRolesForMonitor(monitor: Monitor): List { + /* + * We need to handle 3 cases: + * 1. Monitors created by older versions and never updated. These monitors wont have User details in the + * monitor object. `monitor.user` will be null. Insert `all_access, AmazonES_all_access` role. + * 2. Monitors are created when security plugin is disabled, these will have empty User object. + * (`monitor.user.name`, `monitor.user.roles` are empty ) + * 3. Monitors are created when security plugin is enabled, these will have an User object. + */ + return if (monitor.user == null) { + // fixme: discuss and remove hardcoded to settings? + // TODO: Remove "AmazonES_all_access" role? + monitorCtx.settings!!.getAsList("", listOf("all_access", "AmazonES_all_access")) + } else { + monitor.user!!.roles + } + } + + // TODO: Can this be updated to just use 'Instant.now()'? + // 'threadPool.absoluteTimeInMillis()' is referring to a cached value of System.currentTimeMillis() that by default updates every 200ms + internal fun currentTime() = Instant.ofEpochMilli(monitorCtx.threadPool!!.absoluteTimeInMillis()) + + internal fun isActionActionable(action: Action, alert: Alert?): Boolean { + if (alert == null || action.throttle == null) { + return true + } + if (action.throttleEnabled) { + val result = alert.actionExecutionResults.firstOrNull { r -> r.actionId == action.id } + val lastExecutionTime: Instant? = result?.lastExecutionTime + val throttledTimeBound = currentTime().minus(action.throttle!!.value.toLong(), action.throttle!!.unit) + return (lastExecutionTime == null || lastExecutionTime.isBefore(throttledTimeBound)) + } + return true + } + + internal fun compileTemplate(template: Script, ctx: TriggerExecutionContext): String { + return monitorCtx.scriptService!!.compile(template, TemplateScript.CONTEXT) + .newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg())) + .execute() + } +} diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index c9386b2ef..b3f22b5e6 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -1,7 +1,7 @@ { "dynamic": "strict", "_meta" : { - "schema_version": 1 + "schema_version": 2 }, "properties": { "schema_version": { @@ -51,6 +51,9 @@ }, "timestamp": { "type": "long" + }, + "execution_id": { + "type": "keyword" } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index c1e98450e..77515e975 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -14,7 +14,11 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.rest.RestRequest +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.util.Collections +import java.util.UUID class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { @@ -526,6 +530,73 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test delete executed workflow with metadata deleted`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + ) + + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults.size) + + deleteWorkflow(workflowId, true) + // Verify that the workflow is deleted + try { + getWorkflowById(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + // Verify that the workflow metadata is deleted + try { + searchWorkflowMetadata(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetMonitor Action error ", + it.contains("List is empty") + ) + } + } + } + fun `test delete workflow delegate monitor not deleted`() { val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) @@ -664,6 +735,26 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test create workflow with 26 delegates failure`() { + val monitorsIds = mutableListOf() + for (i in 0..25) { + monitorsIds.add(UUID.randomUUID().toString()) + } + val workflow = randomWorkflow( + monitorIds = monitorsIds + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be larger then 25.") + ) + } + } + } + fun `test update workflow without delegate failure`() { val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) @@ -930,6 +1021,46 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test create workflow delegate and chained finding monitor different indices failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val docMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docMonitorResponse = createMonitor(docMonitor)!! + + val index1 = "$index-1" + createTestIndex(index1) + + val docLevelInput1 = DocLevelMonitorInput( + "description", listOf(index1), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + + val docMonitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger) + ) + val docMonitorResponse1 = createMonitor(docMonitor1)!! + + val workflow = randomWorkflow( + monitorIds = listOf(docMonitorResponse1.id, docMonitorResponse.id) + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegate monitor and it's chained finding monitor must query the same indices") + ) + } + } + } + fun `test create workflow when monitor index not initialized failure`() { val delegates = listOf( Delegate(1, "monitor-1") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt new file mode 100644 index 000000000..76371c3a3 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt @@ -0,0 +1,710 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.junit.Assert +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase +import org.opensearch.alerting.util.AlertingException +import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.GetAlertsRequest +import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Table +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.script.Script +import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder +import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import java.lang.Exception +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit +import java.util.NoSuchElementException +import java.util.concurrent.ExecutionException + +class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { + + fun `test execute workflow with custom alerts and finding index with doc level delegates`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex1 = "custom_alerts_index" + val customFindingsIndex1 = "custom_findings_index" + val customFindingsIndexPattern1 = "custom_findings_index-1" + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1), + dataSources = DataSources( + alertsIndex = customAlertsIndex1, + findingsIndex = customFindingsIndex1, + findingsIndexPattern = customFindingsIndexPattern1 + ) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex2 = "custom_alerts_index_2" + val customFindingsIndex2 = "custom_findings_index_2" + val customFindingsIndexPattern2 = "custom_findings_index-2" + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + dataSources = DataSources( + alertsIndex = customAlertsIndex2, + findingsIndex = customFindingsIndex2, + findingsIndexPattern = customFindingsIndexPattern2 + ) + ) + + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 and monitor2 + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16645, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "2", testDoc2) + + testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Doesn't match + val testDoc3 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16645, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-east-1" + }""" + indexDoc(index, "3", testDoc3) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults.size) + + assertEquals(monitor1.name, monitorsRunResults[0].monitorName) + assertEquals(1, monitorsRunResults[0].triggerResults.size) + + Assert.assertEquals(monitor2.name, monitorsRunResults[1].monitorName) + Assert.assertEquals(1, monitorsRunResults[1].triggerResults.size) + + assertAlerts(monitorResponse.id, customAlertsIndex1, 2) + assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2")) + + assertAlerts(monitorResponse2.id, customAlertsIndex2, 1) + assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2")) + } + + fun `test execute workflows with shared monitor delegates`() { + val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customAlertsIndex = "custom_alerts_index" + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + alertsIndex = customAlertsIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var workflow1 = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse1 = upsertWorkflow(workflow1)!! + val workflowById1 = searchWorkflow(workflowResponse1.id) + assertNotNull(workflowById1) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16645, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "2", testDoc2) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(1, monitorsRunResults.size) + + assertEquals(monitor.name, monitorsRunResults[0].monitorName) + assertEquals(1, monitorsRunResults[0].triggerResults.size) + + assertAlerts(monitorResponse.id, customAlertsIndex, 2) + assertFindings(monitorResponse.id, customFindingsIndex, 2, 2, listOf("1", "2")) + + val workflowId1 = workflowResponse1.id + val executeWorkflowResponse1 = executeWorkflow(workflowById1, workflowId1, false)!! + val monitorsRunResults1 = executeWorkflowResponse1.workflowRunResult.workflowRunResult + assertEquals(1, monitorsRunResults1.size) + + assertEquals(monitor.name, monitorsRunResults1[0].monitorName) + assertEquals(1, monitorsRunResults1[0].triggerResults.size) + + assertAlerts(monitorResponse.id, customAlertsIndex, 2) + assertFindings(monitorResponse.id, customFindingsIndex, 4, 4, listOf("1", "2", "1", "2")) + } + + fun `test execute workflow verify workflow metadata`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + ) + + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + // First execution + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults.size) + + val workflowMetadata = searchWorkflowMetadata(id = workflowId) + assertNotNull("Workflow metadata not initialized", workflowMetadata) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse.workflowRunResult.executionId, + workflowMetadata!!.latestExecutionId + ) + // Second execution + val executeWorkflowResponse1 = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults1 = executeWorkflowResponse1.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults1.size) + + val workflowMetadata1 = searchWorkflowMetadata(id = workflowId) + assertNotNull("Workflow metadata not initialized", workflowMetadata) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse1.workflowRunResult.executionId, + workflowMetadata1!!.latestExecutionId + ) + } + + fun `test execute workflow dryrun verify workflow metadata not created`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + + val docQuery2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery2)) + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput2), + triggers = listOf(trigger2), + ) + + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + // First execution + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, true) + assertNotNull("Workflow run result is null", executeWorkflowResponse) + val monitorsRunResults = executeWorkflowResponse!!.workflowRunResult.workflowRunResult + assertEquals(2, monitorsRunResults.size) + + var exception: Exception? = null + try { + searchWorkflowMetadata(id = workflowId) + } catch (ex: Exception) { + exception = ex + } + assertTrue(exception is NoSuchElementException) + } + + fun `test execute workflow with custom alerts and finding index with bucket level doc level delegates when bucket level delegate is used in chained finding`() { + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field_1").field("test_field_1") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Bucket level monitor will reduce the size of matched doc ids on those that belong to a bucket that contains more than 1 document after term grouping + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null, + ) + ) + val bucketCustomAlertsIndex = "custom_alerts_index" + val bucketCustomFindingsIndex = "custom_findings_index" + val bucketCustomFindingsIndexPattern = "custom_findings_index-1" + + val bucketLevelMonitorResponse = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = bucketCustomAlertsIndex, + findingsIndex = bucketCustomFindingsIndex, + findingsIndexPattern = bucketCustomFindingsIndexPattern + ) + ) + )!! + + val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1") + val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_1\"", name = "2") + val docQuery3 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2, docQuery3)) + val docTrigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val docCustomAlertsIndex = "custom_alerts_index" + val docCustomFindingsIndex = "custom_findings_index" + val docCustomFindingsIndexPattern = "custom_findings_index-1" + var docLevelMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(docTrigger), + dataSources = DataSources( + alertsIndex = docCustomAlertsIndex, + findingsIndex = docCustomFindingsIndex, + findingsIndexPattern = docCustomFindingsIndexPattern + ) + ) + + val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! + // 1. bucketMonitor (chainedFinding = null) 2. docMonitor (chainedFinding = bucketMonitor) + var workflow = randomWorkflow( + monitorIds = listOf(bucketLevelMonitorResponse.id, docLevelMonitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + // Creates 5 documents + insertSampleTimeSerializedData( + index, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2", + "test_value_2", + "test_value_3" + ) + ) + + val workflowId = workflowResponse.id + // 1. bucket level monitor should reduce the doc findings to 4 (1, 2, 3, 4) + // 2. Doc level monitor will match those 4 documents although it contains rules for matching all 5 documents (docQuery3 matches the fifth) + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertNotNull(executeWorkflowResponse) + + for (monitorRunResults in executeWorkflowResponse.workflowRunResult.workflowRunResult) { + if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) { + val searchResult = monitorRunResults.inputResults.results.first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 3, buckets.size) + + assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2) + assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4")) + } else { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult + val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } + val expectedTriggeredDocIds = listOf("1", "2", "3", "4") + assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) + + assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 4) + assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 4, 4, listOf("1", "2", "3", "4")) + } + } + } + + fun `test execute workflow with custom alerts and finding index with bucket level and doc level delegates when doc level delegate is used in chained finding`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1") + val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "2") + + var docLevelMonitor = randomDocumentLevelMonitor( + inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery1, docQuery2))), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)), + dataSources = DataSources( + alertsIndex = "custom_alerts_index", + findingsIndex = "custom_findings_index", + findingsIndexPattern = "custom_findings_index-1" + ) + ) + + val docLevelMonitorResponse = createMonitor(docLevelMonitor)!! + + val query = QueryBuilders.rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + val compositeSources = listOf( + TermsValuesSourceBuilder("test_field_1").field("test_field_1") + ) + val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) + val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) + // Bucket level monitor will reduce the size of matched doc ids on those that belong to a bucket that contains more than 1 document after term grouping + val triggerScript = """ + params.docCount > 1 + """.trimIndent() + + var trigger = randomBucketLevelTrigger() + trigger = trigger.copy( + bucketSelector = BucketSelectorExtAggregationBuilder( + name = trigger.id, + bucketsPathsMap = mapOf("docCount" to "_count"), + script = Script(triggerScript), + parentBucketPath = "composite_agg", + filter = null, + ) + ) + + val bucketLevelMonitorResponse = createMonitor( + randomBucketLevelMonitor( + inputs = listOf(input), + enabled = false, + triggers = listOf(trigger), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = "custom_alerts_index", + findingsIndex = "custom_findings_index", + findingsIndexPattern = "custom_findings_index-1" + ) + ) + )!! + + var docLevelMonitor1 = randomDocumentLevelMonitor( + // Match the documents with test_field_1: test_value_3 + inputs = listOf(DocLevelMonitorInput("description", listOf(index), listOf(docQuery2))), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN)), + dataSources = DataSources( + findingsEnabled = true, + alertsIndex = "custom_alerts_index_1", + findingsIndex = "custom_findings_index_1", + findingsIndexPattern = "custom_findings_index_1-1" + ) + ) + + val docLevelMonitorResponse1 = createMonitor(docLevelMonitor1)!! + + val queryMonitorInput = SearchInput( + indices = listOf(index), + query = SearchSourceBuilder().query( + QueryBuilders + .rangeQuery("test_strict_date_time") + .gt("{{period_end}}||-10d") + .lte("{{period_end}}") + .format("epoch_millis") + ) + ) + val queryTriggerScript = """ + return ctx.results[0].hits.hits.size() > 0 + """.trimIndent() + + val queryLevelTrigger = randomQueryLevelTrigger(condition = Script(queryTriggerScript)) + val queryMonitorResponse = createMonitor(randomQueryLevelMonitor(inputs = listOf(queryMonitorInput), triggers = listOf(queryLevelTrigger)))!! + + // 1. docMonitor (chainedFinding = null) 2. bucketMonitor (chainedFinding = docMonitor) 3. docMonitor (chainedFinding = bucketMonitor) 4. queryMonitor (chainedFinding = docMonitor 3) + var workflow = randomWorkflow( + monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id, docLevelMonitorResponse1.id, queryMonitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + // Creates 5 documents + insertSampleTimeSerializedData( + index, + listOf( + "test_value_1", + "test_value_1", // adding duplicate to verify aggregation + "test_value_2", + "test_value_2", + "test_value_3", + "test_value_3" + ) + ) + + val workflowId = workflowResponse.id + // 1. Doc level monitor should reduce the doc findings to 4 (3 - test_value_2, 4 - test_value_2, 5 - test_value_3, 6 - test_value_3) + // 2. Bucket level monitor will match the fetch the docs from current findings execution, although it contains rules for matching documents which has test_value_2 and test value_3 + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + assertNotNull(executeWorkflowResponse) + + for (monitorRunResults in executeWorkflowResponse.workflowRunResult.workflowRunResult) { + when (monitorRunResults.monitorName) { + // Verify first doc level monitor execution, alerts and findings + docLevelMonitorResponse.monitor.name -> { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult + val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } + val expectedTriggeredDocIds = listOf("3", "4", "5", "6") + assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) + + assertAlerts(docLevelMonitorResponse.id, docLevelMonitorResponse.monitor.dataSources.alertsIndex, 4) + assertFindings(docLevelMonitorResponse.id, docLevelMonitorResponse.monitor.dataSources.findingsIndex, 4, 4, listOf("3", "4", "5", "6")) + } + // Verify second bucket level monitor execution, alerts and findings + bucketLevelMonitorResponse.monitor.name -> { + val searchResult = monitorRunResults.inputResults.results.first() + @Suppress("UNCHECKED_CAST") + val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + assertEquals("Incorrect search result", 2, buckets.size) + + assertAlerts(bucketLevelMonitorResponse.id, bucketLevelMonitorResponse.monitor.dataSources.alertsIndex, 2) + assertFindings(bucketLevelMonitorResponse.id, bucketLevelMonitorResponse.monitor.dataSources.findingsIndex, 1, 4, listOf("3", "4", "5", "6")) + } + // Verify third doc level monitor execution, alerts and findings + docLevelMonitorResponse1.monitor.name -> { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val docLevelTrigger = values.iterator().next() as DocumentLevelTriggerRunResult + val triggeredDocIds = docLevelTrigger.triggeredDocs.map { it.split("|")[0] } + val expectedTriggeredDocIds = listOf("5", "6") + assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) + + assertAlerts(docLevelMonitorResponse1.id, docLevelMonitorResponse1.monitor.dataSources.alertsIndex, 2) + assertFindings(docLevelMonitorResponse1.id, docLevelMonitorResponse1.monitor.dataSources.findingsIndex, 2, 2, listOf("5", "6")) + } + // Verify fourth query level monitor execution + queryMonitorResponse.monitor.name -> { + assertEquals(1, monitorRunResults.inputResults.results.size) + val values = monitorRunResults.triggerResults.values + assertEquals(1, values.size) + @Suppress("UNCHECKED_CAST") + val totalHits = ((monitorRunResults.inputResults.results[0]["hits"] as Map)["total"] as Map) ["value"] + assertEquals(2, totalHits) + @Suppress("UNCHECKED_CAST") + val docIds = ((monitorRunResults.inputResults.results[0]["hits"] as Map)["hits"] as List>).map { it["_id"]!! } + assertEquals(listOf("5", "6"), docIds.sorted()) + } + } + } + } + + fun `test execute workflow input error`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + deleteIndex(index) + + val response = executeWorkflow(workflowById, workflowById!!.id, false)!! + val error = response.workflowRunResult.workflowRunResult[0].error + assertNotNull(error) + assertTrue(error is AlertingException) + assertEquals(RestStatus.NOT_FOUND, (error as AlertingException).status) + assertEquals("Configured indices are not found: [$index]", (error as AlertingException).message) + } + + fun `test execute workflow wrong workflow id`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + var exception: Exception? = null + val badWorkflowId = getWorkflowResponse.id + "bad" + try { + executeWorkflow(id = badWorkflowId) + } catch (ex: Exception) { + exception = ex + } + assertTrue(exception is ExecutionException) + assertTrue(exception!!.cause is AlertingException) + assertEquals(RestStatus.NOT_FOUND, (exception.cause as AlertingException).status) + assertEquals("Can't find workflow with id: $badWorkflowId", exception.cause!!.message) + } + + private fun assertFindings( + monitorId: String, + customFindingsIndex: String, + findingSize: Int, + matchedQueryNumber: Int, + relatedDocIds: List + ) { + val findings = searchFindings(monitorId, customFindingsIndex) + assertEquals("Findings saved for test monitor", findingSize, findings.size) + + val findingDocIds = findings.flatMap { it.relatedDocIds } + + assertEquals("Didn't match $matchedQueryNumber query", matchedQueryNumber, findingDocIds.size) + assertTrue("Findings saved for test monitor", relatedDocIds.containsAll(findingDocIds)) + } + + private fun assertAlerts( + monitorId: String, + customAlertsIndex: String, + alertSize: Int + ) { + val alerts = searchAlerts(monitorId, customAlertsIndex) + assertEquals("Alert saved for test monitor", alertSize, alerts.size) + val table = Table("asc", "id", null, alertSize, 0, "") + var getAlertsResponse = client() + .execute( + AlertingActions.GET_ALERTS_ACTION_TYPE, + GetAlertsRequest(table, "ALL", "ALL", null, customAlertsIndex) + ) + .get() + assertTrue(getAlertsResponse != null) + assertTrue(getAlertsResponse.alerts.size == alertSize) + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", monitorId, null)) + .get() + assertTrue(getAlertsResponse != null) + assertTrue(getAlertsResponse.alerts.size == alertSize) + + val alertIds = getAlertsResponse.alerts.map { it.id } + val acknowledgeAlertResponse = client().execute( + AlertingActions.ACKNOWLEDGE_ALERTS_ACTION_TYPE, + AcknowledgeAlertRequest(monitorId, alertIds, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + + assertEquals(alertSize, acknowledgeAlertResponse.acknowledged.size) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index 8c4508c69..542a3eb45 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -125,6 +125,25 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { ) } + protected fun createTestIndex(index: String) { + val mapping = XContentFactory.jsonBuilder() + mapping.startObject() + .startObject("properties") + .startObject("test_strict_date_time") + .field("type", "date") + .field("format", "strict_date_time") + .endObject() + .startObject("test_field_1") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + + createIndex( + index, Settings.EMPTY, mapping + ) + } + private fun createIndex( index: String?, settings: Settings?, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index 749b71555..4b3bf4e81 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -7,6 +7,11 @@ package org.opensearch.alerting.transport import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.action.ExecuteWorkflowAction +import org.opensearch.alerting.action.ExecuteWorkflowRequest +import org.opensearch.alerting.action.ExecuteWorkflowResponse +import org.opensearch.alerting.model.WorkflowMetadata +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.DeleteWorkflowRequest @@ -21,6 +26,7 @@ import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder +import java.time.Instant /** * A test that keep a singleton node started for all tests that can be used to get @@ -59,6 +65,35 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { }.first() } + protected fun searchWorkflowMetadata( + id: String, + indices: String = ScheduledJob.SCHEDULED_JOBS_INDEX, + refresh: Boolean = true, + ): WorkflowMetadata? { + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return null + } + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder("workflow_metadata.workflow_id", id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { it -> + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + lateinit var workflowMetadata: WorkflowMetadata + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "workflow_metadata" -> workflowMetadata = WorkflowMetadata.parse(xcp) + } + } + workflowMetadata.copy(id = it.id) + }.first() + } + protected fun upsertWorkflow( workflow: Workflow, id: String = Workflow.NO_ID, @@ -88,4 +123,9 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { DeleteWorkflowRequest(workflowId, deleteDelegateMonitors) ).get() } + + protected fun executeWorkflow(workflow: Workflow? = null, id: String? = null, dryRun: Boolean = true): ExecuteWorkflowResponse? { + val request = ExecuteWorkflowRequest(dryRun, TimeValue(Instant.now().toEpochMilli()), id, workflow) + return client().execute(ExecuteWorkflowAction.INSTANCE, request).get() + } } diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 881b3717b..2e844d300 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -663,6 +663,29 @@ "enabled": false } } + }, + "workflow_metadata" : { + "properties": { + "workflow_id": { + "type": "keyword" + }, + "monitor_ids": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 1000 + } + } + }, + "latest_run_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "latest_execution_id": { + "type": "keyword" + } + } } } }