From f078b7d0217b2d94fccad89b981fe050b64e4b48 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Thu, 23 Mar 2023 18:22:59 +0100 Subject: [PATCH] Added layer for creating and updating the workflow (#831) * Renamed chainedFindings to chainedMonitorFindings * Removed unecessary mappings from workflow definition * Improved logging when saving the workflows * Added a workflow id in response * Added role check and index access once the workflow is being created * Updated mappings for the workflow --------- Signed-off-by: Stevan Buzejic --- alerting/build.gradle | 2 + .../org/opensearch/alerting/AlertingPlugin.kt | 10 +- .../transport/TransportIndexWorkflowAction.kt | 639 ++++++++++++++ .../opensearch/alerting/util/AlertingUtils.kt | 2 + .../org/opensearch/alerting/TestHelpers.kt | 65 ++ .../opensearch/alerting/WorkflowMonitorIT.kt | 802 ++++++++++++++++++ .../transport/AlertingSingleNodeTestCase.kt | 79 +- .../transport/WorkflowSingleNodeTestCase.kt | 75 ++ .../resources/mappings/scheduled-jobs.json | 147 +++- 9 files changed, 1808 insertions(+), 13 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt diff --git a/alerting/build.gradle b/alerting/build.gradle index 248920f3b..63c1ba09a 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -122,6 +122,8 @@ dependencies { testImplementation "org.mockito:mockito-core:${versions.mockito}" testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}" testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}" + testImplementation "org.opensearch.plugin:lang-painless:${opensearch_version}" + testImplementation "org.opensearch.plugin:lang-mustache-client:${opensearch_version}" } javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0fa6eeae4..32fbf3643 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -52,6 +52,7 @@ import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction import org.opensearch.alerting.transport.TransportIndexMonitorAction +import org.opensearch.alerting.transport.TransportIndexWorkflowAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction @@ -80,6 +81,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule @@ -117,6 +119,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val OPEN_SEARCH_DASHBOARDS_USER_AGENT = "OpenSearch-Dashboards" @JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}") @JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors" + @JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" @JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" @JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" @JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" @@ -180,8 +183,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java), ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java), - ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java) - + ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java) ) } @@ -193,7 +196,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R QueryLevelTrigger.XCONTENT_REGISTRY, BucketLevelTrigger.XCONTENT_REGISTRY, ClusterMetricsInput.XCONTENT_REGISTRY, - DocumentLevelTrigger.XCONTENT_REGISTRY + DocumentLevelTrigger.XCONTENT_REGISTRY, + Workflow.XCONTENT_REGISTRY ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt new file mode 100644 index 000000000..961d4d587 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -0,0 +1,639 @@ +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.ExceptionsHelper +import org.opensearch.OpenSearchException +import org.opensearch.OpenSearchStatusException +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthAction +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.opensearchapi.addFilter +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS +import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT +import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTION_THROTTLE_VALUE +import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT +import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils +import org.opensearch.alerting.util.isQueryLevelMonitor +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.io.stream.NamedWriteableRegistry +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentFactory.jsonBuilder +import org.opensearch.common.xcontent.XContentHelper +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.authuser.User +import org.opensearch.commons.utils.recreateObject +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.util.stream.Collectors + +private val log = LogManager.getLogger(TransportIndexWorkflowAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportIndexWorkflowAction @Inject constructor( + transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val scheduledJobIndices: ScheduledJobIndices, + val clusterService: ClusterService, + val settings: Settings, + val xContentRegistry: NamedXContentRegistry, + val namedWriteableRegistry: NamedWriteableRegistry, +) : HandledTransportAction( + AlertingActions.INDEX_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::IndexWorkflowRequest +), + SecureTransportAction { + + @Volatile + private var maxMonitors = ALERTING_MAX_MONITORS.get(settings) + + @Volatile + private var requestTimeout = REQUEST_TIMEOUT.get(settings) + + @Volatile + private var indexTimeout = INDEX_TIMEOUT.get(settings) + + @Volatile + private var maxActionThrottle = MAX_ACTION_THROTTLE_VALUE.get(settings) + + @Volatile + private var allowList = ALLOW_LIST.get(settings) + + @Volatile + override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(ALERTING_MAX_MONITORS) { maxMonitors = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(REQUEST_TIMEOUT) { requestTimeout = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_TIMEOUT) { indexTimeout = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_ACTION_THROTTLE_VALUE) { maxActionThrottle = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) { allowList = it } + listenFilterBySettingChange(clusterService) + } + + override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener) { + val transformedRequest = request as? IndexWorkflowRequest + ?: recreateObject(request, namedWriteableRegistry) { + IndexWorkflowRequest(it) + } + + val user = readUserFromThreadContext(client) + + if (!validateUserBackendRoles(user, actionListener)) { + return + } + + if ( + user != null && + !isAdmin(user) && + transformedRequest.rbacRoles != null + ) { + if (transformedRequest.rbacRoles?.stream()?.anyMatch { !user.backendRoles.contains(it) } == true) { + log.error( + "User specified backend roles, ${transformedRequest.rbacRoles}, " + + "that they don' have access to. User backend roles: ${user.backendRoles}" + ) + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "User specified backend roles that they don't have access to. Contact administrator", + RestStatus.FORBIDDEN + ) + ) + ) + return + } else if (transformedRequest.rbacRoles?.isEmpty() == true) { + log.error( + "Non-admin user are not allowed to specify an empty set of backend roles. " + + "Please don't pass in the parameter or pass in at least one backend role." + ) + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Non-admin user are not allowed to specify an empty set of backend roles.", + RestStatus.FORBIDDEN + ) + ) + ) + return + } + } + scope.launch { + try { + validateRequest(client, transformedRequest, user) + + // If the validation was successful - continue with the execution + client.threadPool().threadContext.stashContext().use { + IndexWorkflowHandler(client, actionListener, transformedRequest, user).resolveUserAndStart() + } + } catch (e: Exception) { + log.error("Failed to create workflow", e) + if (e is IndexNotFoundException) { + actionListener.onFailure( + OpenSearchStatusException( + "Monitors not found", + RestStatus.NOT_FOUND + ) + ) + } else { + actionListener.onFailure(e) + } + } + } + } + + /** + * Validates the request in several steps + * Checks if the user has appropriate backend roles and if he can access the given monitors and it's indices + */ + private suspend fun validateRequest(client: Client, request: IndexWorkflowRequest, user: User?) { + if (request.workflow.inputs.isEmpty()) + throw AlertingException.wrap(IllegalArgumentException("Input list can not be empty.")) + + if (request.workflow.inputs.size > 1) + throw AlertingException.wrap(IllegalArgumentException("Input list can contain only one element.")) + + if (request.workflow.inputs[0] !is CompositeInput) + throw AlertingException.wrap(IllegalArgumentException("When creating a workflow input must be CompositeInput")) + + val compositeInput = request.workflow.inputs[0] as CompositeInput + val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) + + if (monitorIds.isNullOrEmpty()) + throw AlertingException.wrap(IllegalArgumentException("Delegates list can not be empty.")) + + validateDuplicateDelegateMonitorReferenceExists(monitorIds) + validateSequenceOrdering(compositeInput.sequence.delegates) + validateChainedMonitorFindings(compositeInput.sequence.delegates) + + val monitors = getDelegateMonitors(user, monitorIds) + + validateDelegateMonitorsExist(monitorIds, monitors) + validateChainedMonitorFindingsMonitors(compositeInput.sequence.delegates, monitors) + + val indices = getMonitorIndices(monitors) + + validateIndicesAccess(indices, client) + } + + inner class IndexWorkflowHandler( + private val client: Client, + private val actionListener: ActionListener, + private val request: IndexWorkflowRequest, + private val user: User? + ) { + fun resolveUserAndStart() { + scope.launch { + if (user == null) { + // Security is disabled, add empty user to Workflow. user is null for older versions. + request.workflow = request.workflow + .copy(user = User("", listOf(), listOf(), listOf())) + start() + } else { + request.workflow = request.workflow + .copy(user = User(user.name, user.backendRoles, user.roles, user.customAttNames)) + start() + } + } + } + + fun start() { + if (!scheduledJobIndices.scheduledJobIndexExists()) { + scheduledJobIndices.initScheduledJobIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + onCreateMappingsResponse(response.isAcknowledged) + } + + override fun onFailure(t: Exception) { + // https://github.com/opensearch-project/alerting/issues/646 + if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) { + scope.launch { + // Wait for the yellow status + val request = ClusterHealthRequest() + .indices(SCHEDULED_JOBS_INDEX) + .waitForYellowStatus() + val response: ClusterHealthResponse = client.suspendUntil { + execute(ClusterHealthAction.INSTANCE, request, it) + } + if (response.isTimedOut) { + log.error("Workflow creation timeout", t) + actionListener.onFailure( + OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy") + ) + } + // Retry mapping of workflow + onCreateMappingsResponse(true) + } + } else { + log.error("Failed to create workflow", t) + actionListener.onFailure(AlertingException.wrap(t)) + } + } + }) + } else if (!IndexUtils.scheduledJobIndexUpdated) { + IndexUtils.updateIndexMapping( + SCHEDULED_JOBS_INDEX, + ScheduledJobIndices.scheduledJobMappings(), clusterService.state(), client.admin().indices(), + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + onUpdateMappingsResponse(response) + } + + override fun onFailure(t: Exception) { + log.error("Failed to create workflow", t) + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } else { + prepareWorkflowIndexing() + } + } + + /** + * This function prepares for indexing a new workflow. + * If this is an update request we can simply update the workflow. Otherwise we first check to see how many monitors already exist, + * and compare this to the [maxMonitorCount]. Requests that breach this threshold will be rejected. + */ + private fun prepareWorkflowIndexing() { + if (request.method == RestRequest.Method.PUT) { + scope.launch { + updateWorkflow() + } + } else { + scope.launch { + indexWorkflow() + } + } + } + + private fun onCreateMappingsResponse(isAcknowledged: Boolean) { + if (isAcknowledged) { + log.info("Created $SCHEDULED_JOBS_INDEX with mappings.") + prepareWorkflowIndexing() + IndexUtils.scheduledJobIndexUpdated() + } else { + log.error("Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged.") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged", RestStatus.INTERNAL_SERVER_ERROR + ) + ) + ) + } + } + + private fun onUpdateMappingsResponse(response: AcknowledgedResponse) { + if (response.isAcknowledged) { + log.info("Updated $SCHEDULED_JOBS_INDEX with mappings.") + IndexUtils.scheduledJobIndexUpdated() + prepareWorkflowIndexing() + } else { + log.error("Update $SCHEDULED_JOBS_INDEX mappings call not acknowledged.") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Updated $SCHEDULED_JOBS_INDEX mappings call not acknowledged.", + RestStatus.INTERNAL_SERVER_ERROR + ) + ) + ) + } + } + + private suspend fun indexWorkflow() { + if (user != null) { + val rbacRoles = if (request.rbacRoles == null) user.backendRoles.toSet() + else if (!isAdmin(user)) request.rbacRoles?.intersect(user.backendRoles)?.toSet() + else request.rbacRoles + + request.workflow = request.workflow.copy( + user = User(user.name, rbacRoles.orEmpty().toList(), user.roles, user.customAttNames) + ) + log.debug("Created workflow's backend roles: $rbacRoles") + } + + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(request.refreshPolicy) + .source(request.workflow.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .setIfSeqNo(request.seqNo) + .setIfPrimaryTerm(request.primaryTerm) + .timeout(indexTimeout) + + try { + val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } + val failureReasons = checkShardsFailure(indexResponse) + if (failureReasons != null) { + log.error("Failed to create workflow: $failureReasons") + actionListener.onFailure( + AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + ) + return + } + actionListener.onResponse( + IndexWorkflowResponse( + indexResponse.id, indexResponse.version, indexResponse.seqNo, + indexResponse.primaryTerm, request.workflow.copy(id = indexResponse.id) + ) + ) + } catch (t: Exception) { + log.error("Failed to index workflow", t) + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private suspend fun updateWorkflow() { + val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.workflowId) + try { + val getResponse: GetResponse = client.suspendUntil { client.get(getRequest, it) } + if (!getResponse.isExists) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Workflow with ${request.workflowId} is not found", RestStatus.NOT_FOUND) + ) + ) + return + } + val xcp = XContentHelper.createParser( + xContentRegistry, LoggingDeprecationHandler.INSTANCE, + getResponse.sourceAsBytesRef, XContentType.JSON + ) + val workflow = ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow + onGetResponse(workflow) + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private suspend fun onGetResponse(currentWorkflow: Workflow) { + if (!checkUserPermissionsWithResource(user, currentWorkflow.user, actionListener, "workfklow", request.workflowId)) { + return + } + + // If both are enabled, use the current existing monitor enabled time, otherwise the next execution will be + // incorrect. + if (request.workflow.enabled && currentWorkflow.enabled) + request.workflow = request.workflow.copy(enabledTime = currentWorkflow.enabledTime) + + /** + * On update workflow check which backend roles to associate to the workflow. + * Below are 2 examples of how the logic works + * + * Example 1, say we have a Workflow with backend roles [a, b, c, d] associated with it. + * If I'm User A (non-admin user) and I have backend roles [a, b, c] associated with me and I make a request to update + * the Workflow's backend roles to [a, b]. This would mean that the roles to remove are [c] and the roles to add are [a, b]. + * The Workflow's backend roles would then be [a, b, d]. + * + * Example 2, say we have a Workflow with backend roles [a, b, c, d] associated with it. + * If I'm User A (admin user) and I have backend roles [a, b, c] associated with me and I make a request to update + * the Workflow's backend roles to [a, b]. This would mean that the roles to remove are [c, d] and the roles to add are [a, b]. + * The Workflow's backend roles would then be [a, b]. + */ + if (user != null) { + if (request.rbacRoles != null) { + if (isAdmin(user)) { + request.workflow = request.workflow.copy( + user = User(user.name, request.rbacRoles, user.roles, user.customAttNames) + ) + } else { + // rolesToRemove: these are the backend roles to remove from the monitor + val rolesToRemove = user.backendRoles - request.rbacRoles.orEmpty() + // remove the monitor's roles with rolesToRemove and add any roles passed into the request.rbacRoles + val updatedRbac = currentWorkflow.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty() + request.workflow = request.workflow.copy( + user = User(user.name, updatedRbac, user.roles, user.customAttNames) + ) + } + } else { + request.workflow = request.workflow + .copy(user = User(user.name, currentWorkflow.user!!.backendRoles, user.roles, user.customAttNames)) + } + log.debug("Update workflow backend roles to: ${request.workflow.user?.backendRoles}") + } + + request.workflow = request.workflow.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) + val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(request.refreshPolicy) + .source(request.workflow.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .id(request.workflowId) + .setIfSeqNo(request.seqNo) + .setIfPrimaryTerm(request.primaryTerm) + .timeout(indexTimeout) + + try { + val indexResponse: IndexResponse = client.suspendUntil { client.index(indexRequest, it) } + val failureReasons = checkShardsFailure(indexResponse) + if (failureReasons != null) { + actionListener.onFailure( + AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + ) + return + } + actionListener.onResponse( + IndexWorkflowResponse( + indexResponse.id, indexResponse.version, indexResponse.seqNo, + indexResponse.primaryTerm, request.workflow.copy(id = currentWorkflow.id) + ) + ) + } catch (t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + + private fun checkShardsFailure(response: IndexResponse): String? { + val failureReasons = StringBuilder() + if (response.shardInfo.failed > 0) { + response.shardInfo.failures.forEach { entry -> + failureReasons.append(entry.reason()) + } + return failureReasons.toString() + } + return null + } + } + + private fun validateChainedMonitorFindings(delegates: List) { + val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } + delegates.forEach { + if (it.chainedMonitorFindings != null) { + if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) { + throw AlertingException.wrap( + IllegalArgumentException( + "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence" + ) + ) + } + if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) { + throw AlertingException.wrap( + IllegalArgumentException( + "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}" + ) + ) + } + } + } + } + + private fun validateChainedMonitorFindingsMonitors(delegates: List, monitorDelegates: List) { + val monitorsById = monitorDelegates.associateBy { it.id } + delegates.forEach { + if (it.chainedMonitorFindings != null) { + val chainedFindingMonitor = monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap( + IllegalArgumentException("Chained finding monitor doesn't exist") + ) + + if (chainedFindingMonitor.isQueryLevelMonitor()) { + throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings")) + } + } + } + } + + private fun validateSequenceOrdering(delegates: List) { + val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet()) + if (orderSet.size != delegates.size) { + throw AlertingException.wrap(IllegalArgumentException("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) + } + } + + private fun validateDuplicateDelegateMonitorReferenceExists( + monitorIds: MutableList + ) { + if (monitorIds.toSet().size != monitorIds.size) { + throw AlertingException.wrap(IllegalArgumentException("Duplicate delegates not allowed")) + } + } + + private fun validateDelegateMonitorsExist( + monitorIds: List, + delegateMonitors: List + ) { + val reqMonitorIds: MutableList = monitorIds as MutableList + delegateMonitors.forEach { + reqMonitorIds.remove(it.id) + } + if (reqMonitorIds.isNotEmpty()) { + throw AlertingException.wrap(IllegalArgumentException(("${reqMonitorIds.joinToString()} are not valid monitor ids"))) + } + } + + /** + * Returns monitors for the given ids if user has an access + */ + private suspend fun getDelegateMonitors( + user: User?, + monitorIds: MutableList + ): List { + val query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("_id", monitorIds)) + val searchSource = SearchSourceBuilder().query(query) + val monitorSearchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) + // TODO - Add secure tests once the Rest Action is created + if (user != null && filterByEnabled) { + addFilter(user, monitorSearchRequest.source(), "monitor.user.backend_roles.keyword") + } + + val searchMonitorResponse: SearchResponse = client.suspendUntil { client.search(monitorSearchRequest, it) } + + if (searchMonitorResponse.status() != RestStatus.OK) { + throw AlertingException.wrap( + OpenSearchStatusException( + "User doesn't have read permissions for one or more configured monitors ${monitorIds.joinToString()}", + RestStatus.FORBIDDEN + ) + ) + } + if (searchMonitorResponse.isTimedOut) { + throw OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy") + } + val monitors = mutableListOf() + for (hit in searchMonitorResponse.hits) { + XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.sourceAsString + ).use { hitsParser -> + val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor + monitors.add(monitor) + } + } + return monitors + } + + /** + * Extract indices from monitors + */ + private fun getMonitorIndices(monitors: List): MutableList { + val indices = mutableListOf() + + val searchInputs = + monitors.flatMap { monitor -> monitor.inputs.filter { it.name() == SearchInput.SEARCH_FIELD || it.name() == DocLevelMonitorInput.DOC_LEVEL_INPUT_FIELD } } + searchInputs.forEach { + val inputIndices = if (it.name() == SearchInput.SEARCH_FIELD) (it as SearchInput).indices + else (it as DocLevelMonitorInput).indices + indices.addAll(inputIndices) + } + return indices + } + + /** + * Checks if the user can access the monitor indices + */ + private suspend fun validateIndicesAccess( + indices: MutableList, + client: Client, + ) { + val indicesSearchRequest = SearchRequest().indices(*indices.toTypedArray()) + .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) + + val indicesSearchResponse: SearchResponse = client.suspendUntil { client.search(indicesSearchRequest, it) } + if (indicesSearchResponse.status() != RestStatus.OK) { + throw AlertingException.wrap( + OpenSearchStatusException( + "User doesn't have read permissions for one or more configured index ${indices.joinToString()}", + RestStatus.FORBIDDEN + ) + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 849b1deb3..c58a6d3bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -46,6 +46,8 @@ fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTI fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR +fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR + /** * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used * as the key for a HashMap to easily retrieve [AggregationResultBucket] based on the bucket key values. diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index fa408308d..d9c6cd1e3 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -35,8 +35,11 @@ import org.opensearch.commons.alerting.model.ActionExecutionResult import org.opensearch.commons.alerting.model.AggregationResultBucket import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.BucketLevelTrigger +import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.Delegate import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.DocumentLevelTrigger @@ -47,7 +50,10 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.Schedule import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.model.Sequence import org.opensearch.commons.alerting.model.Trigger +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.Workflow.WorkflowType import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope @@ -217,6 +223,61 @@ fun randomDocumentLevelMonitor( ) } +fun randomWorkflow( + id: String = Workflow.NO_ID, + monitorIds: List, + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS) +): Workflow { + val delegates = mutableListOf() + if (!monitorIds.isNullOrEmpty()) { + delegates.add(Delegate(1, monitorIds[0])) + for (i in 1 until monitorIds.size) { + // Order of monitors in workflow will be the same like forwarded meaning that the first monitorId will be used as second monitor chained finding + delegates.add(Delegate(i + 1, monitorIds [i], ChainedMonitorFindings(monitorIds[i - 1]))) + } + } + + return Workflow( + id = id, + name = name, + enabled = enabled, + schedule = schedule, + lastUpdateTime = lastUpdateTime, + enabledTime = enabledTime, + workflowType = WorkflowType.COMPOSITE, + user = user, + inputs = listOf(CompositeInput(Sequence(delegates))) + ) +} + +fun randomWorkflowWithDelegates( + id: String = Workflow.NO_ID, + delegates: List, + name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), + user: User? = randomUser(), + schedule: Schedule = IntervalSchedule(interval = 5, unit = ChronoUnit.MINUTES), + enabled: Boolean = randomBoolean(), + enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null, + lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), +): Workflow { + return Workflow( + id = id, + name = name, + enabled = enabled, + schedule = schedule, + lastUpdateTime = lastUpdateTime, + enabledTime = enabledTime, + workflowType = WorkflowType.COMPOSITE, + user = user, + inputs = listOf(CompositeInput(Sequence(delegates))) + ) +} + fun randomQueryLevelTrigger( id: String = UUIDs.base64UUID(), name: String = OpenSearchRestTestCase.randomAlphaOfLength(10), @@ -686,3 +747,7 @@ fun assertUserNull(map: Map) { fun assertUserNull(monitor: Monitor) { assertNull("User is not null", monitor.user) } + +fun assertUserNull(workflow: Workflow) { + assertNull("User is not null", workflow.user) +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt new file mode 100644 index 000000000..7363a5e7b --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -0,0 +1,802 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting + +import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase +import org.opensearch.commons.alerting.model.ChainedMonitorFindings +import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.DataSources +import org.opensearch.commons.alerting.model.Delegate +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.rest.RestRequest +import java.util.Collections + +class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { + + fun `test create workflow success`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + val workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse1.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse2.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse1.id, delegate2.chainedMonitorFindings!!.monitorId + ) + } + + fun `test update workflow add monitor success`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + var workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + val monitor3 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse3 = createMonitor(monitor3)!! + + val updatedWorkflowResponse = upsertWorkflow( + randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id, monitorResponse3.id) + ), + workflowResponse.id, + RestRequest.Method.PUT + )!! + + assertNotNull("Workflow creation failed", updatedWorkflowResponse) + assertNotNull(updatedWorkflowResponse.workflow) + assertEquals("Workflow id changed", workflowResponse.id, updatedWorkflowResponse.id) + assertTrue("incorrect version", updatedWorkflowResponse.version > 0) + + workflowById = searchWorkflow(updatedWorkflowResponse.id)!! + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", updatedWorkflowResponse.workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", updatedWorkflowResponse.workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 3, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse1.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse2.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse1.id, delegate2.chainedMonitorFindings!!.monitorId + ) + + val delegate3 = delegates[2] + assertNotNull(delegate3) + assertEquals("Delegate3 order not correct", 3, delegate3.order) + assertEquals("Delegate3 id not correct", monitorResponse3.id, delegate3.monitorId) + assertEquals( + "Delegate3 Chained finding not correct", monitorResponse2.id, delegate3.chainedMonitorFindings!!.monitorId + ) + } + + fun `test update workflow change order of delegate monitors`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + var workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + val updatedWorkflowResponse = upsertWorkflow( + randomWorkflow( + monitorIds = listOf(monitorResponse2.id, monitorResponse1.id) + ), + workflowResponse.id, + RestRequest.Method.PUT + )!! + + assertNotNull("Workflow creation failed", updatedWorkflowResponse) + assertNotNull(updatedWorkflowResponse.workflow) + assertEquals("Workflow id changed", workflowResponse.id, updatedWorkflowResponse.id) + assertTrue("incorrect version", updatedWorkflowResponse.version > 0) + + workflowById = searchWorkflow(updatedWorkflowResponse.id)!! + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", updatedWorkflowResponse.workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", updatedWorkflowResponse.workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 2, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse2.id, delegate1.monitorId) + + val delegate2 = delegates[1] + assertNotNull(delegate2) + assertEquals("Delegate2 order not correct", 2, delegate2.order) + assertEquals("Delegate2 id not correct", monitorResponse1.id, delegate2.monitorId) + assertEquals( + "Delegate2 Chained finding not correct", monitorResponse2.id, delegate2.chainedMonitorFindings!!.monitorId + ) + } + + fun `test update workflow remove monitor success`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + assertNotNull(workflowResponse.workflow) + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowResponse.id) + assertTrue("incorrect version", workflowResponse.version > 0) + + var workflowById = searchWorkflow(workflowResponse.id)!! + assertNotNull(workflowById) + + val updatedWorkflowResponse = upsertWorkflow( + randomWorkflow( + monitorIds = listOf(monitorResponse1.id) + ), + workflowResponse.id, + RestRequest.Method.PUT + )!! + + assertNotNull("Workflow creation failed", updatedWorkflowResponse) + assertNotNull(updatedWorkflowResponse.workflow) + assertEquals("Workflow id changed", workflowResponse.id, updatedWorkflowResponse.id) + assertTrue("incorrect version", updatedWorkflowResponse.version > 0) + + workflowById = searchWorkflow(updatedWorkflowResponse.id)!! + + // Verify workflow + assertNotEquals("response is missing Id", Monitor.NO_ID, workflowById.id) + assertTrue("incorrect version", workflowById.version > 0) + assertEquals("Workflow name not correct", updatedWorkflowResponse.workflow.name, workflowById.name) + assertEquals("Workflow owner not correct", updatedWorkflowResponse.workflow.owner, workflowById.owner) + assertEquals("Workflow input not correct", updatedWorkflowResponse.workflow.inputs, workflowById.inputs) + + // Delegate verification + @Suppress("UNCHECKED_CAST") + val delegates = (workflowById.inputs as List)[0].sequence.delegates.sortedBy { it.order } + assertEquals("Delegates size not correct", 1, delegates.size) + + val delegate1 = delegates[0] + assertNotNull(delegate1) + assertEquals("Delegate1 order not correct", 1, delegate1.order) + assertEquals("Delegate1 id not correct", monitorResponse1.id, delegate1.monitorId) + } + + fun `test update workflow doesn't exist failure`() { + val docQuery1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + + val monitorResponse1 = createMonitor(monitor1)!! + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + try { + upsertWorkflow(workflow, "testId", RestRequest.Method.PUT) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow with testId is not found") + ) + } + } + } + + fun `test create workflow without delegate failure`() { + val workflow = randomWorkflow( + monitorIds = Collections.emptyList() + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + + fun `test update workflow without delegate failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + ) + + val monitorResponse1 = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse1.id, monitorResponse2.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflow( + id = workflowResponse.id, + monitorIds = Collections.emptyList() + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Delegates list can not be empty.") + ) + } + } + } + + fun `test create workflow duplicate delegate failure`() { + val workflow = randomWorkflow( + monitorIds = listOf("1", "1", "2") + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test update workflow duplicate delegate failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflow( + id = workflowResponse.id, + monitorIds = listOf("1", "1", "2") + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Duplicate delegates not allowed") + ) + } + } + } + + fun `test create workflow delegate monitor doesn't exist failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + val workflow = randomWorkflow( + monitorIds = listOf("-1", monitorResponse.id) + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test update workflow delegate monitor doesn't exist failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + workflow = randomWorkflow( + id = workflowResponse.id, + monitorIds = listOf("-1", monitorResponse.id) + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("are not valid monitor ids") + ) + } + } + } + + fun `test create workflow sequence order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test update workflow sequence order not correct failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(1, "monitor-2"), + Delegate(2, "monitor-3") + ) + workflow = randomWorkflowWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Sequence ordering of delegate monitor shouldn't contain duplicate order values") + ) + } + } + } + + fun `test create workflow chained findings monitor not in sequence failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x")) + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test create workflow query monitor chained findings monitor failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val docMonitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val docMonitorResponse = createMonitor(docMonitor)!! + + val queryMonitor = randomQueryLevelMonitor() + val queryMonitorResponse = createMonitor(queryMonitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(queryMonitorResponse.id, docMonitorResponse.id) + ) + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Query level monitor can't be part of chained findings") + ) + } + } + } + + fun `test create workflow when monitor index not initialized failure`() { + val delegates = listOf( + Delegate(1, "monitor-1") + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Monitors not found") + ) + } + } + } + + fun `test update workflow chained findings monitor not in sequence failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(2, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(3, "monitor-3", ChainedMonitorFindings("monitor-x")) + ) + workflow = randomWorkflowWithDelegates( + id = workflowResponse.id, + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-x doesn't exist in sequence") + ) + } + } + } + + fun `test create workflow chained findings order not correct failure`() { + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2")) + ) + val workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } + + fun `test update workflow chained findings order not correct failure`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + assertNotNull("Workflow creation failed", workflowResponse) + + val delegates = listOf( + Delegate(1, "monitor-1"), + Delegate(3, "monitor-2", ChainedMonitorFindings("monitor-1")), + Delegate(2, "monitor-3", ChainedMonitorFindings("monitor-2")) + ) + workflow = randomWorkflowWithDelegates( + delegates = delegates + ) + + try { + upsertWorkflow(workflow) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning IndexWorkflow Action error ", + it.contains("Chained Findings Monitor monitor-2 should be executed before monitor monitor-3") + ) + } + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index 8c889a426..38f3fd9b9 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting.transport import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequest import org.opensearch.action.admin.indices.get.GetIndexRequestBuilder import org.opensearch.action.admin.indices.get.GetIndexResponse @@ -23,6 +24,8 @@ import org.opensearch.alerting.action.GetMonitorRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue +import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions @@ -35,16 +38,22 @@ import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.Finding import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.Table +import org.opensearch.index.IndexService import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.reindex.ReindexModulePlugin import org.opensearch.index.seqno.SequenceNumbers -import org.opensearch.join.ParentJoinModulePlugin +import org.opensearch.join.ParentJoinPlugin +import org.opensearch.painless.PainlessPlugin import org.opensearch.plugins.Plugin import org.opensearch.rest.RestRequest +import org.opensearch.script.mustache.MustachePlugin import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.fetch.subphase.FetchSourceContext import org.opensearch.test.OpenSearchSingleNodeTestCase import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.util.Locale /** @@ -75,19 +84,60 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { return client().execute(ExecuteMonitorAction.INSTANCE, request).get() } + protected fun insertSampleTimeSerializedData(index: String, data: List) { + data.forEachIndexed { i, value -> + val twoMinsAgo = ZonedDateTime.now().minus(2, ChronoUnit.MINUTES).truncatedTo(ChronoUnit.MILLIS) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(twoMinsAgo) + val testDoc = """ + { + "test_strict_date_time": "$testTime", + "test_field_1": "$value", + "number": "$i" + } + """.trimIndent() + // Indexing documents with deterministic doc id to allow for easy selected deletion during testing + indexDoc(index, (i + 1).toString(), testDoc) + } + } + + @Suppress("UNCHECKED_CAST") + fun Map.stringMap(key: String): Map? { + val map = this as Map> + return map[key] + } + /** A test index that can be used across tests. Feel free to add new fields but don't remove any. */ protected fun createTestIndex() { + val mapping = XContentFactory.jsonBuilder() + mapping.startObject() + .startObject("properties") + .startObject("test_strict_date_time") + .field("type", "date") + .field("format", "strict_date_time") + .endObject() + .startObject("test_field_1") + .field("type", "keyword") + .endObject() + .endObject() + .endObject() + createIndex( - index, Settings.EMPTY, - """ - "properties" : { - "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, - "test_field" : { "type" : "keyword" } - } - """.trimIndent() + index, Settings.EMPTY, mapping ) } + private fun createIndex( + index: String?, + settings: Settings?, + mappings: XContentBuilder?, + ): IndexService? { + val createIndexRequestBuilder = client().admin().indices().prepareCreate(index).setSettings(settings) + if (mappings != null) { + createIndexRequestBuilder.setMapping(mappings) + } + return this.createIndex(index, createIndexRequestBuilder) + } + protected fun indexDoc(index: String, id: String, doc: String) { client().prepareIndex(index).setId(id) .setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get() @@ -230,7 +280,18 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { ).get() override fun getPlugins(): List> { - return listOf(AlertingPlugin::class.java, ReindexModulePlugin::class.java, ParentJoinModulePlugin::class.java) + return listOf( + AlertingPlugin::class.java, + ReindexPlugin::class.java, + MustachePlugin::class.java, + PainlessPlugin::class.java, + ParentJoinPlugin::class.java + ) + } + + protected fun deleteIndex(index: String) { + val response = client().admin().indices().delete(DeleteIndexRequest(index)).get() + assertTrue("Unable to delete index", response.isAcknowledged()) } override fun resetNodeAfterTest(): Boolean { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt new file mode 100644 index 000000000..d3a77d81b --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -0,0 +1,75 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope +import org.opensearch.action.support.WriteRequest +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.json.JsonXContent +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.IndexWorkflowRequest +import org.opensearch.commons.alerting.action.IndexWorkflowResponse +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.index.query.TermQueryBuilder +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.rest.RestRequest +import org.opensearch.search.builder.SearchSourceBuilder + +/** + * A test that keep a singleton node started for all tests that can be used to get + * references to Guice injectors in unit tests. + */ + +@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { + + protected fun searchWorkflow( + id: String, + indices: String = ScheduledJob.SCHEDULED_JOBS_INDEX, + refresh: Boolean = true, + ): Workflow? { + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return null + } + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder("_id", id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { it -> + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + lateinit var workflow: Workflow + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "workflow" -> workflow = Workflow.parse(xcp) + } + } + workflow.copy(id = it.id, version = it.version) + }.first() + } + + protected fun upsertWorkflow( + workflow: Workflow, + id: String = Workflow.NO_ID, + method: RestRequest.Method = RestRequest.Method.POST, + ): IndexWorkflowResponse? { + val request = IndexWorkflowRequest( + workflowId = id, + seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, + primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + refreshPolicy = WriteRequest.RefreshPolicy.parse("true"), + method = method, + workflow = workflow + ) + + return client().execute(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, request).actionGet() + } +} diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 4f8d71d82..881b3717b 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 6 + "schema_version": 7 }, "properties": { "monitor": { @@ -299,6 +299,151 @@ } } }, + "workflow": { + "dynamic": "false", + "properties": { + "schema_version": { + "type": "integer" + }, + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "owner": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "workflow_type": { + "type": "keyword" + }, + "user": { + "properties": { + "name": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "backend_roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "roles": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + }, + "custom_attribute_names": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } + } + } + }, + "type": { + "type": "keyword" + }, + "enabled": { + "type": "boolean" + }, + "enabled_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_update_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "schedule": { + "properties": { + "period": { + "properties": { + "interval": { + "type": "integer" + }, + "unit": { + "type": "keyword" + } + } + }, + "cron": { + "properties": { + "expression": { + "type": "text" + }, + "timezone": { + "type": "keyword" + } + } + } + } + }, + "inputs": { + "type": "nested", + "properties": { + "composite_input": { + "type": "nested", + "properties": { + "sequence": { + "properties": { + "delegates": { + "type": "nested", + "properties": { + "order": { + "type": "integer" + }, + "monitor_id": { + "type": "keyword" + }, + "chained_monitor_findings": { + "properties": { + "monitor_id": { + "type": "keyword" + } + } + } + } + } + } + } + } + } + } + }, + "group_by_fields": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + }, "destination": { "dynamic": "false", "properties": {