From c328bab597aa9b75f524f3a3922d65330bcfe7a1 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Mon, 15 May 2023 22:41:45 +0200 Subject: [PATCH] Fixed deleting monitor workflow metadata (#882) * Fixed deleting monitor metadata and workflow metadata. Signed-off-by: Stevan Buzejic Signed-off-by: Surya Sashank Nistala --- .../org/opensearch/alerting/AlertingPlugin.kt | 3 + .../alerting/MonitorMetadataService.kt | 16 +- .../alerting/WorkflowMetadataService.kt | 7 +- .../alerting/model/MonitorMetadata.kt | 14 +- .../alerting/service/DeleteMonitorService.kt | 186 +++++++++++++ .../transport/TransportDeleteMonitorAction.kt | 151 ++--------- .../TransportDeleteWorkflowAction.kt | 151 +++++++---- .../TransportExecuteWorkflowAction.kt | 6 +- .../transport/TransportIndexWorkflowAction.kt | 18 +- .../org/opensearch/alerting/TestHelpers.kt | 10 +- .../opensearch/alerting/WorkflowMonitorIT.kt | 87 +++++- .../opensearch/alerting/WorkflowRunnerIT.kt | 251 ++++++++++++++++-- .../transport/AlertingSingleNodeTestCase.kt | 2 +- .../transport/WorkflowSingleNodeTestCase.kt | 30 +++ 14 files changed, 704 insertions(+), 228 deletions(-) create mode 100644 alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 9d01e02e2..58e82bd59 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -40,6 +40,7 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction import org.opensearch.alerting.script.TriggerScript +import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings @@ -279,6 +280,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R settings ) + DeleteMonitorService.initialize(client) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 3855f4897..c27584f9e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -171,7 +171,7 @@ object MonitorMetadataService : else null val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap>) - } else null + else null return if (runContext != null) { metadata.copy( lastRunContext = runContext @@ -184,12 +184,15 @@ object MonitorMetadataService : } } - private suspend fun createNewMetadata(monitor: Monitor, createWithRunContext: Boolean, workflowMetadataId: String? = null): MonitorMetadata { - val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + private suspend fun createNewMetadata( + monitor: Monitor, + createWithRunContext: Boolean, + workflowMetadataId: String? = null, + ): MonitorMetadata { + val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) (monitor.inputs[0] as DocLevelMonitorInput).indices[0] else null - val runContext = - if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext) + val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext) createFullRunContext(monitorIndex) else emptyMap() return MonitorMetadata( @@ -202,8 +205,7 @@ object MonitorMetadataService : sourceToQueryIndexMapping = mutableMapOf() ) } - - private suspend fun createFullRunContext( + suspend fun createFullRunContext( index: String?, existingRunContext: MutableMap>? = null ): MutableMap> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt index 056830af7..7020732fd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/WorkflowMetadataService.kt @@ -98,7 +98,10 @@ object WorkflowMetadataService : } catch (e: Exception) { // If the update is set to false and id is set conflict exception will be thrown if (e is OpenSearchException && e.status() == RestStatus.CONFLICT && !updating) { - log.debug("Metadata with ${metadata.id} for workflow ${metadata.workflowId} already exist. Instead of creating new, updating existing metadata will be performed") + log.debug( + "Metadata with ${metadata.id} for workflow ${metadata.workflowId} already exist." + + " Instead of creating new, updating existing metadata will be performed" + ) return upsertWorkflowMetadata(metadata, true) } log.error("Error saving metadata", e) @@ -157,6 +160,8 @@ object WorkflowMetadataService : } private fun createNewWorkflowMetadata(workflow: Workflow, executionId: String, isTempWorkflow: Boolean): WorkflowMetadata { + // In the case of temp workflow (ie. workflow is in dry-run) use timestampWithUUID-metadata format + // In the case of regular workflow execution, use the workflowId-metadata format val id = if (isTempWorkflow) "${LocalDateTime.now(ZoneOffset.UTC)}${UUID.randomUUID()}" else workflow.id return WorkflowMetadata( id = WorkflowMetadata.getId(id), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt index b20780107..ace38be6a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorMetadata.kt @@ -119,9 +119,17 @@ data class MonitorMetadata( return MonitorMetadata(sin) } - fun getId(monitor: Monitor, workflowId: String? = null): String { - return if (workflowId.isNullOrEmpty()) "${monitor.id}-metadata" - else "${monitor.id}-$workflowId-metadata" + /** workflowMetadataId is used as key for monitor metadata in the case when the workflow execution happens + so the monitor lastRunContext (in the case of doc level monitor) is not interfering with the monitor execution + WorkflowMetadataId will be either workflowId-metadata (when executing the workflow as it is scheduled) + or timestampWithUUID-metadata (when a workflow is executed in a dry-run mode) + In the case of temp workflow, doc level monitors must have lastRunContext created from scratch + That's why we are using workflowMetadataId - in order to ensure that the doc level monitor metadata is created from scratch + **/ + fun getId(monitor: Monitor, workflowMetadataId: String? = null): String { + return if (workflowMetadataId.isNullOrEmpty()) "${monitor.id}-metadata" + // WorkflowMetadataId already contains -metadata suffix + else "$workflowMetadataId-${monitor.id}-metadata" } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt new file mode 100644 index 000000000..a21ec379e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -0,0 +1,186 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.service + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import org.apache.logging.log4j.LogManager +import org.apache.lucene.search.join.ScoreMode +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.alerting.MonitorMetadataService +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.transport.TransportDeleteWorkflowAction.Companion.WORKFLOW_DELEGATE_PATH +import org.opensearch.alerting.transport.TransportDeleteWorkflowAction.Companion.WORKFLOW_MONITOR_PATH +import org.opensearch.alerting.util.AlertingException +import org.opensearch.client.Client +import org.opensearch.commons.alerting.action.DeleteMonitorResponse +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder +import org.opensearch.search.builder.SearchSourceBuilder +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine + +/** + * Component used when deleting the monitors + */ +object DeleteMonitorService : + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("WorkflowMetadataService")) { + private val log = LogManager.getLogger(this.javaClass) + + private lateinit var client: Client + + fun initialize( + client: Client, + ) { + DeleteMonitorService.client = client + } + + /** + * Deletes the monitor, docLevelQueries and monitor metadata + * @param monitor monitor to be deleted + * @param refreshPolicy + */ + suspend fun deleteMonitor(monitor: Monitor, refreshPolicy: RefreshPolicy): DeleteMonitorResponse { + val deleteResponse = deleteMonitor(monitor.id, refreshPolicy) + deleteDocLevelMonitorQueriesAndIndices(monitor) + deleteMetadata(monitor) + return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version) + } + + private suspend fun deleteMonitor(monitorId: String, refreshPolicy: RefreshPolicy): DeleteResponse { + val deleteMonitorRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, monitorId) + .setRefreshPolicy(refreshPolicy) + return client.suspendUntil { delete(deleteMonitorRequest, it) } + } + + private suspend fun deleteMetadata(monitor: Monitor) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") + .setRefreshPolicy(RefreshPolicy.IMMEDIATE) + try { + val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } + log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}") + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted, + // we cannot retry based on this failure + log.error("Failed to delete monitor metadata ${deleteRequest.id()}.", e) + } + } + + private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { + try { + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return + } + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitor.id) + ) + ) + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) + } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } + } + } + } catch (e: Exception) { + // we only log the error and don't fail the request because if monitor document has been deleted successfully, + // we cannot retry based on this failure + log.error("Failed to delete doc level queries from query index.", e) + } + } + + /** + * Checks if the monitor is part of the workflow + * + * @param monitorId id of monitor that is checked if it is a workflow delegate + */ + suspend fun monitorIsWorkflowDelegate(monitorId: String): Boolean { + val queryBuilder = QueryBuilders.nestedQuery( + WORKFLOW_DELEGATE_PATH, + QueryBuilders.boolQuery().must( + QueryBuilders.matchQuery( + WORKFLOW_MONITOR_PATH, + monitorId + ) + ), + ScoreMode.None + ) + try { + val searchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source(SearchSourceBuilder().query(queryBuilder)) + + client.threadPool().threadContext.stashContext().use { + val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } + if (searchResponse.hits.totalHits?.value == 0L) { + return false + } + + val workflowIds = searchResponse.hits.hits.map { it.id }.joinToString() + log.info("Monitor $monitorId can't be deleted since it belongs to $workflowIds") + return true + } + } catch (ex: Exception) { + log.error("Error getting the monitor workflows", ex) + throw AlertingException.wrap(ex) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index b96ef8119..b72a77cfc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -12,22 +12,13 @@ import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest -import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest -import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse -import org.opensearch.action.delete.DeleteRequest -import org.opensearch.action.delete.DeleteResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse -import org.opensearch.action.search.SearchRequest -import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction -import org.opensearch.action.support.IndicesOptions -import org.opensearch.action.support.WriteRequest -import org.opensearch.action.support.master.AcknowledgedResponse -import org.opensearch.alerting.MonitorMetadataService +import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client @@ -45,17 +36,9 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.index.query.QueryBuilders -import org.opensearch.index.reindex.BulkByScrollResponse -import org.opensearch.index.reindex.DeleteByQueryAction -import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus -import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException -import kotlin.coroutines.suspendCoroutine private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) private val log = LogManager.getLogger(TransportDeleteMonitorAction::class.java) @@ -82,42 +65,52 @@ class TransportDeleteMonitorAction @Inject constructor( val transformedRequest = request as? DeleteMonitorRequest ?: recreateObject(request) { DeleteMonitorRequest(it) } val user = readUserFromThreadContext(client) - val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.monitorId) - .setRefreshPolicy(transformedRequest.refreshPolicy) if (!validateUserBackendRoles(user, actionListener)) { return } scope.launch { - DeleteMonitorHandler(client, actionListener, deleteRequest, user, transformedRequest.monitorId).resolveUserAndStart() + DeleteMonitorHandler( + client, + actionListener, + user, + transformedRequest.monitorId + ).resolveUserAndStart(transformedRequest.refreshPolicy) } } inner class DeleteMonitorHandler( private val client: Client, private val actionListener: ActionListener, - private val deleteRequest: DeleteRequest, private val user: User?, private val monitorId: String ) { - suspend fun resolveUserAndStart() { + suspend fun resolveUserAndStart(refreshPolicy: RefreshPolicy) { try { val monitor = getMonitor() - val canDelete = user == null || - !doFilterForUser(user) || + val canDelete = user == null || !doFilterForUser(user) || checkUserPermissionsWithResource(user, monitor.user, actionListener, "monitor", monitorId) - if (canDelete) { - val deleteResponse = deleteAllResourcesForMonitor(client, monitor, deleteRequest, monitorId) - actionListener.onResponse(DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)) + if (DeleteMonitorService.monitorIsWorkflowDelegate(monitor.id)) { + actionListener.onFailure( + AlertingException( + "Monitor can't be deleted because it is a part of workflow(s)", + RestStatus.FORBIDDEN, + IllegalStateException() + ) + ) + } else if (canDelete) { + actionListener.onResponse( + DeleteMonitorService.deleteMonitor(monitor, refreshPolicy) + ) } else { actionListener.onFailure( AlertingException("Not allowed to delete this monitor!", RestStatus.FORBIDDEN, IllegalStateException()) ) } } catch (t: Exception) { - log.error("Failed to delete monitor ${deleteRequest.id()}", t) + log.error("Failed to delete monitor $monitorId", t) actionListener.onFailure(AlertingException.wrap(t)) } } @@ -140,102 +133,4 @@ class TransportDeleteMonitorAction @Inject constructor( return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Monitor } } - - companion object { - @JvmStatic - suspend fun deleteAllResourcesForMonitor( - client: Client, - monitor: Monitor, - deleteRequest: DeleteRequest, - monitorId: String, - ): DeleteResponse { - val deleteResponse = deleteMonitorDocument(client, deleteRequest) - deleteMetadata(client, monitor) - deleteDocLevelMonitorQueriesAndIndices(client, monitor, monitorId) - return deleteResponse - } - - private suspend fun deleteMonitorDocument(client: Client, deleteRequest: DeleteRequest): DeleteResponse { - return client.suspendUntil { delete(deleteRequest, it) } - } - - suspend fun deleteMetadata(client: Client, monitor: Monitor) { - val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${monitor.id}-metadata") - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - try { - val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } - log.debug("Monitor metadata: ${deleteResponse.id} deletion result: ${deleteResponse.result}") - } catch (e: Exception) { - // we only log the error and don't fail the request because if monitor document has been deleted, - // we cannot retry based on this failure - log.error("Failed to delete monitor metadata ${deleteRequest.id()}.", e) - } - } - - suspend fun deleteDocLevelMonitorQueriesAndIndices( - client: Client, - monitor: Monitor, - monitorId: String, - ) { - try { - val metadata = MonitorMetadataService.getMetadata(monitor) - metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - - val indicesExistsResponse: IndicesExistsResponse = - client.suspendUntil { - client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) - } - if (indicesExistsResponse.isExists == false) { - return - } - // Check if there's any queries from other monitors in this queryIndex, - // to avoid unnecessary doc deletion, if we could just delete index completely - val searchResponse: SearchResponse = client.suspendUntil { - search( - SearchRequest(queryIndex).source( - SearchSourceBuilder() - .size(0) - .query( - QueryBuilders.boolQuery().mustNot( - QueryBuilders.matchQuery("monitor_id", monitorId) - ) - ) - ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), - it - ) - } - if (searchResponse.hits.totalHits.value == 0L) { - val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete( - DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it - ) - } - if (ack.isAcknowledged == false) { - log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") - } - } else { - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) { - cont.resumeWithException(t) - } - } - ) - } - } - } - } catch (e: Exception) { - // we only log the error and don't fail the request because if monitor document has been deleted successfully, - // we cannot retry based on this failure - log.error("Failed to delete doc level queries from query index.", e) - } - } - } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index 2fd84030e..79fd817ca 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.apache.lucene.search.join.ScoreMode +import org.opensearch.OpenSearchException import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest @@ -23,27 +24,25 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy -import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.model.MonitorMetadata +import org.opensearch.alerting.model.WorkflowMetadata import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.opensearchapi.suspendUntil -import org.opensearch.alerting.opensearchapi.withClosableContext +import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client -import org.opensearch.client.node.NodeClient import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType -import org.opensearch.commons.alerting.AlertingPluginInterface import org.opensearch.commons.alerting.action.AlertingActions -import org.opensearch.commons.alerting.action.DeleteMonitorRequest -import org.opensearch.commons.alerting.action.DeleteMonitorResponse import org.opensearch.commons.alerting.action.DeleteWorkflowRequest import org.opensearch.commons.alerting.action.DeleteWorkflowResponse import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User @@ -52,11 +51,13 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService -import java.util.UUID private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) /** @@ -69,12 +70,11 @@ class TransportDeleteWorkflowAction @Inject constructor( actionFilters: ActionFilters, val clusterService: ClusterService, val settings: Settings, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, ) : HandledTransportAction( AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest ), SecureTransportAction { - private val log = LogManager.getLogger(javaClass) @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) @@ -113,7 +113,7 @@ class TransportDeleteWorkflowAction @Inject constructor( private val deleteRequest: DeleteRequest, private val deleteDelegateMonitors: Boolean?, private val user: User?, - private val workflowId: String + private val workflowId: String, ) { suspend fun resolveUserAndStart() { try { @@ -131,13 +131,14 @@ class TransportDeleteWorkflowAction @Inject constructor( if (canDelete) { val delegateMonitorIds = (workflow.inputs[0] as CompositeInput).getMonitorIds() - + var deletableMonitors = listOf() // User can only delete the delegate monitors only in the case if all monitors can be deleted - // Partial monitor deletion is not available + // if there are monitors in this workflow that are referenced in other workflows, we cannot delete the monitors. + // We will not partially delete monitors. we delete them all or fail the request. if (deleteDelegateMonitors == true) { - val monitorIdsToBeDeleted = getDeletableDelegates(workflowId, delegateMonitorIds, user) + deletableMonitors = getDeletableDelegates(workflowId, delegateMonitorIds, user) val monitorsDiff = delegateMonitorIds.toMutableList() - monitorsDiff.removeAll(monitorIdsToBeDeleted) + monitorsDiff.removeAll(deletableMonitors.map { it.id }) if (monitorsDiff.isNotEmpty()) { actionListener.onFailure( @@ -151,26 +152,33 @@ class TransportDeleteWorkflowAction @Inject constructor( } } - val deleteResponse = deleteWorkflow(workflow) + val deleteResponse = deleteWorkflow(deleteRequest) + var deleteWorkflowResponse = DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version) + + val workflowMetadataId = WorkflowMetadata.getId(workflow.id) + + val metadataIdsToDelete = mutableListOf(workflowMetadataId) + if (deleteDelegateMonitors == true) { - if (user != null && filterByEnabled) { - // Un-stash the context - withClosableContext( - InjectorContextElement( - user.name.plus(UUID.randomUUID().toString()), - settings, - client.threadPool().threadContext, - user.roles, - user - ) - ) { - deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) - } - } else { - deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + val failedMonitorIds = tryDeletingMonitors(deletableMonitors, RefreshPolicy.IMMEDIATE) + // Update delete workflow response + deleteWorkflowResponse.nonDeletedMonitors = failedMonitorIds + // Delete monitors workflow metadata + // Monitor metadata will be in workflowId-monitorId-metadata format + metadataIdsToDelete.addAll(deletableMonitors.map { MonitorMetadata.getId(it, workflowMetadataId) }) + } + try { + // Delete the monitors workflow metadata + val deleteMonitorWorkflowMetadataResponse: BulkByScrollResponse = client.suspendUntil { + DeleteByQueryRequestBuilder(this, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.SCHEDULED_JOBS_INDEX) + .filter(QueryBuilders.idsQuery().addIds(*metadataIdsToDelete.toTypedArray())) + .execute(it) } + } catch (t: Exception) { + log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t) } - actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version)) + actionListener.onResponse(deleteWorkflowResponse) } else { actionListener.onFailure( AlertingException( @@ -189,36 +197,45 @@ class TransportDeleteWorkflowAction @Inject constructor( ) ) } else { + log.error("Failed to delete workflow $workflowId", t) actionListener.onFailure(AlertingException.wrap(t)) } } } - private suspend fun deleteMonitors(monitorIds: List, refreshPolicy: RefreshPolicy) { - if (monitorIds.isEmpty()) - return - - for (monitorId in monitorIds) { - val deleteRequest = DeleteMonitorRequest(monitorId, refreshPolicy) - val searchResponse: DeleteMonitorResponse = client.suspendUntil { - AlertingPluginInterface.deleteMonitor(this as NodeClient, deleteRequest, it) + /** + * Tries to delete the given list of the monitors. Return value contains all the monitorIds for which deletion failed + * @param monitorIds list of monitor ids to be deleted + * @param refreshPolicy + * @return list of the monitors that were not deleted + */ + private suspend fun tryDeletingMonitors(monitors: List, refreshPolicy: RefreshPolicy): List { + val nonDeletedMonitorIds = mutableListOf() + for (monitor in monitors) { + try { + DeleteMonitorService.deleteMonitor(monitor, refreshPolicy) + } catch (ex: Exception) { + log.error("failed to delete delegate monitor ${monitor.id} for $workflowId") + nonDeletedMonitorIds.add(monitor.id) } } + return nonDeletedMonitorIds } /** - * Returns lit of monitor ids belonging only to a given workflow + * Returns lit of monitor ids belonging only to a given workflow. + * if filterBy is enabled, it filters and returns only those monitors which user has permission to delete. * @param workflowIdToBeDeleted Id of the workflow that should be deleted * @param monitorIds List of delegate monitor ids (underlying monitor ids) */ - private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List, user: User?): List { + private suspend fun getDeletableDelegates(workflowIdToBeDeleted: String, monitorIds: List, user: User?): List { // Retrieve monitors belonging to another workflows val queryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery("_id", workflowIdToBeDeleted)).filter( QueryBuilders.nestedQuery( - Workflow.WORKFLOW_DELEGATE_PATH, + WORKFLOW_DELEGATE_PATH, QueryBuilders.boolQuery().must( QueryBuilders.termsQuery( - Workflow.WORKFLOW_MONITOR_PATH, + WORKFLOW_MONITOR_PATH, monitorIds ) ), @@ -230,11 +247,6 @@ class TransportDeleteWorkflowAction @Inject constructor( .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) .source(SearchSourceBuilder().query(queryBuilder)) - // Check if user can access the monitors(since the monitors could get modified later and the user might not have the backend roles to access the monitors) - if (user != null && filterByEnabled) { - addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") - } - val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) } val workflows = searchResponse.hits.hits.map { hit -> @@ -251,9 +263,35 @@ class TransportDeleteWorkflowAction @Inject constructor( } workflow.copy(id = hit.id, version = hit.version) } - val workflowMonitors = workflows.filter { it.id != workflowIdToBeDeleted }.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct() + val workflowMonitors = workflows.flatMap { (it.inputs[0] as CompositeInput).getMonitorIds() }.distinct() // Monitors that can be deleted -> all workflow delegates - monitors belonging to different workflows - return monitorIds.minus(workflowMonitors.toSet()) + val deletableMonitorIds = monitorIds.minus(workflowMonitors.toSet()) + + // filtering further to get the list of monitors that user has permission to delete if filterby is enabled and user is not null + val query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("_id", deletableMonitorIds)) + val searchSource = SearchSourceBuilder().query(query) + val monitorSearchRequest = SearchRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).source(searchSource) + + if (user != null && filterByEnabled) { + addFilter(user, monitorSearchRequest.source(), "monitor.user.backend_roles.keyword") + } + + val searchMonitorResponse: SearchResponse = client.suspendUntil { search(monitorSearchRequest, it) } + if (searchMonitorResponse.isTimedOut) { + throw OpenSearchException("Cannot determine that the ${ScheduledJob.SCHEDULED_JOBS_INDEX} index is healthy") + } + val deletableMonitors = mutableListOf() + for (hit in searchMonitorResponse.hits) { + XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, hit.sourceAsString + ).use { hitsParser -> + val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version) as Monitor + deletableMonitors.add(monitor) + } + } + + return deletableMonitors } private suspend fun getWorkflow(): Workflow { @@ -274,14 +312,19 @@ class TransportDeleteWorkflowAction @Inject constructor( return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow } - private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse { + private suspend fun deleteWorkflow(deleteRequest: DeleteRequest): DeleteResponse { log.debug("Deleting the workflow with id ${deleteRequest.id()}") return client.suspendUntil { delete(deleteRequest, it) } } - // TODO - use once the workflow metadata concept is introduced - private suspend fun deleteMetadata(workflow: Workflow) { - val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata") + + private suspend fun deleteWorkflowMetadata(workflow: Workflow) { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, WorkflowMetadata.getId(workflow.id)) val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) } } } + + companion object { + const val WORKFLOW_DELEGATE_PATH = "workflow.inputs.composite_input.sequence.delegates" + const val WORKFLOW_MONITOR_PATH = "workflow.inputs.composite_input.sequence.delegates.monitor_id" + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt index bd90d569a..45c2a5cd1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt @@ -47,7 +47,11 @@ class TransportExecuteWorkflowAction @Inject constructor( ) : HandledTransportAction( ExecuteWorkflowAction.NAME, transportService, actionFilters, ::ExecuteWorkflowRequest ) { - override fun doExecute(task: Task, execWorkflowRequest: ExecuteWorkflowRequest, actionListener: ActionListener) { + override fun doExecute( + task: Task, + execWorkflowRequest: ExecuteWorkflowRequest, + actionListener: ActionListener, + ) { val userStr = client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) log.debug("User and roles string from thread context: $userStr") val user: User? = User.parse(userStr) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index a68a32ab8..60f39a063 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -140,7 +140,8 @@ class TransportIndexWorkflowAction @Inject constructor( ) { if (transformedRequest.rbacRoles?.stream()?.anyMatch { !user.backendRoles.contains(it) } == true) { log.error( - "User specified backend roles, ${transformedRequest.rbacRoles}, that they don' have access to. User backend roles: ${user.backendRoles}" + "User specified backend roles, ${transformedRequest.rbacRoles}, " + + "that they don' have access to. User backend roles: ${user.backendRoles}" ) actionListener.onFailure( AlertingException.wrap( @@ -153,7 +154,8 @@ class TransportIndexWorkflowAction @Inject constructor( return } else if (transformedRequest.rbacRoles?.isEmpty() == true) { log.error( - "Non-admin user are not allowed to specify an empty set of backend roles. Please don't pass in the parameter or pass in at least one backend role." + "Non-admin user are not allowed to specify an empty set of backend roles. " + + "Please don't pass in the parameter or pass in at least one backend role." ) actionListener.onFailure( AlertingException.wrap( @@ -543,7 +545,9 @@ class TransportIndexWorkflowAction @Inject constructor( val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor) if (!delegateMonitorIndices.equalsIgnoreOrder(chainedMonitorIndices)) { - throw AlertingException.wrap(IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices")) + throw AlertingException.wrap( + IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices") + ) } } } @@ -604,7 +608,7 @@ class TransportIndexWorkflowAction @Inject constructor( val searchSource = SearchSourceBuilder().query(query) val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) - if (user != null && filterByEnabled) { + if (user != null && !isAdmin(user) && filterByEnabled) { addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") } @@ -703,7 +707,11 @@ class TransportIndexWorkflowAction @Inject constructor( val indices = mutableListOf() val searchInputs = - monitors.flatMap { monitor -> monitor.inputs.filter { it.name() == SearchInput.SEARCH_FIELD || it.name() == DocLevelMonitorInput.DOC_LEVEL_INPUT_FIELD } } + monitors.flatMap { monitor -> + monitor.inputs.filter { + it.name() == SearchInput.SEARCH_FIELD || it.name() == DocLevelMonitorInput.DOC_LEVEL_INPUT_FIELD + } + } searchInputs.forEach { val inputIndices = if (it.name() == SearchInput.SEARCH_FIELD) (it as SearchInput).indices else (it as DocLevelMonitorInput).indices diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index d9c6cd1e3..6611f8676 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -251,7 +251,10 @@ fun randomWorkflow( enabledTime = enabledTime, workflowType = WorkflowType.COMPOSITE, user = user, - inputs = listOf(CompositeInput(Sequence(delegates))) + inputs = listOf(CompositeInput(Sequence(delegates))), + version = -1L, + schemaVersion = 0, + triggers = emptyList(), ) } @@ -274,7 +277,10 @@ fun randomWorkflowWithDelegates( enabledTime = enabledTime, workflowType = WorkflowType.COMPOSITE, user = user, - inputs = listOf(CompositeInput(Sequence(delegates))) + inputs = listOf(CompositeInput(Sequence(delegates))), + version = -1L, + schemaVersion = 0, + triggers = emptyList() ) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt index 30a71ced5..6e98ea102 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowMonitorIT.kt @@ -5,7 +5,9 @@ package org.opensearch.alerting +import org.opensearch.alerting.model.WorkflowMetadata import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase +import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.model.ChainedMonitorFindings import org.opensearch.commons.alerting.model.CompositeInput import org.opensearch.commons.alerting.model.DataSources @@ -482,6 +484,46 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } + fun `test delete workflow keeping delegate monitor`() { + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + + val monitorResponse = createMonitor(monitor)!! + + val workflowRequest = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflowRequest)!! + val workflowId = workflowResponse.id + val getWorkflowResponse = getWorkflowById(id = workflowResponse.id) + + assertNotNull(getWorkflowResponse) + assertEquals(workflowId, getWorkflowResponse.id) + + deleteWorkflow(workflowId, false) + // Verify that the workflow is deleted + try { + getWorkflowById(workflowId) + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetWorkflow Action error ", + it.contains("Workflow not found.") + ) + } + } + // Verify that the monitor is not deleted + val existingDelegate = getMonitorResponse(monitorResponse.id) + assertNotNull(existingDelegate) + } + fun `test delete workflow delegate monitor deleted`() { val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) @@ -572,6 +614,19 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult assertEquals(2, monitorsRunResults.size) + val workflowMetadata = searchWorkflowMetadata(workflowId) + assertNotNull(workflowMetadata) + + val monitorMetadataId1 = getDelegateMonitorMetadataId(workflowMetadata, monitorResponse) + val monitorMetadata1 = searchMonitorMetadata(monitorMetadataId1) + assertNotNull(monitorMetadata1) + + val monitorMetadataId2 = getDelegateMonitorMetadataId(workflowMetadata, monitorResponse2) + val monitorMetadata2 = searchMonitorMetadata(monitorMetadataId2) + assertNotNull(monitorMetadata2) + + assertFalse(monitorMetadata1!!.id == monitorMetadata2!!.id) + deleteWorkflow(workflowId, true) // Verify that the workflow is deleted try { @@ -587,6 +642,31 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { // Verify that the workflow metadata is deleted try { searchWorkflowMetadata(workflowId) + fail("expected searchWorkflowMetadata method to throw exception") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetMonitor Action error ", + it.contains("List is empty") + ) + } + } + // Verify that the monitors metadata are deleted + try { + searchMonitorMetadata(monitorMetadataId1) + fail("expected searchMonitorMetadata method to throw exception") + } catch (e: Exception) { + e.message?.let { + assertTrue( + "Exception not returning GetMonitor Action error ", + it.contains("List is empty") + ) + } + } + + try { + searchMonitorMetadata(monitorMetadataId2) + fail("expected searchMonitorMetadata method to throw exception") } catch (e: Exception) { e.message?.let { assertTrue( @@ -597,7 +677,12 @@ class WorkflowMonitorIT : WorkflowSingleNodeTestCase() { } } - fun `test delete workflow delegate monitor not deleted`() { + private fun getDelegateMonitorMetadataId( + workflowMetadata: WorkflowMetadata?, + monitorResponse: IndexMonitorResponse, + ) = "${workflowMetadata!!.id}-${monitorResponse.id}-metadata" + + fun `test delete workflow delegate monitor part of another workflow not deleted`() { val docLevelInput = DocLevelMonitorInput( "description", listOf(index), listOf(DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3")) ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt index fe94c28a4..f2daf618b 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/WorkflowRunnerIT.kt @@ -7,13 +7,16 @@ package org.opensearch.alerting import org.junit.Assert import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.transport.WorkflowSingleNodeTestCase import org.opensearch.alerting.util.AlertingException import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AlertingActions import org.opensearch.commons.alerting.action.GetAlertsRequest +import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder +import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery @@ -118,14 +121,16 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { Assert.assertEquals(monitor2.name, monitorsRunResults[1].monitorName) Assert.assertEquals(1, monitorsRunResults[1].triggerResults.size) - assertAlerts(monitorResponse.id, customAlertsIndex1, 2) + val getAlertsResponse = assertAlerts(monitorResponse.id, customAlertsIndex1, 2) + assertAcknowledges(getAlertsResponse.alerts, monitorResponse.id, 2) assertFindings(monitorResponse.id, customFindingsIndex1, 2, 2, listOf("1", "2")) - assertAlerts(monitorResponse2.id, customAlertsIndex2, 1) + val getAlertsResponse2 = assertAlerts(monitorResponse2.id, customAlertsIndex2, 1) + assertAcknowledges(getAlertsResponse2.alerts, monitorResponse2.id, 1) assertFindings(monitorResponse2.id, customFindingsIndex2, 1, 1, listOf("2")) } - fun `test execute workflows with shared monitor delegates`() { + fun `test execute workflows with shared doc level monitor delegate`() { val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -184,9 +189,22 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { assertEquals(monitor.name, monitorsRunResults[0].monitorName) assertEquals(1, monitorsRunResults[0].triggerResults.size) + // Assert and not ack the alerts (in order to verify later on that all the alerts are generated) assertAlerts(monitorResponse.id, customAlertsIndex, 2) assertFindings(monitorResponse.id, customFindingsIndex, 2, 2, listOf("1", "2")) + // Verify workflow and monitor delegate metadata + val workflowMetadata = searchWorkflowMetadata(id = workflowId) + assertNotNull("Workflow metadata not initialized", workflowMetadata) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse.workflowRunResult.executionId, + workflowMetadata!!.latestExecutionId + ) + val monitorMetadataId = "${monitorResponse.id}-${workflowMetadata.id}" + val monitorMetadata = searchMonitorMetadata(monitorMetadataId) + assertNotNull(monitorMetadata) + // Execute second workflow val workflowId1 = workflowResponse1.id val executeWorkflowResponse1 = executeWorkflow(workflowById1, workflowId1, false)!! val monitorsRunResults1 = executeWorkflowResponse1.workflowRunResult.workflowRunResult @@ -195,8 +213,134 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { assertEquals(monitor.name, monitorsRunResults1[0].monitorName) assertEquals(1, monitorsRunResults1[0].triggerResults.size) - assertAlerts(monitorResponse.id, customAlertsIndex, 2) + val getAlertsResponse = assertAlerts(monitorResponse.id, customAlertsIndex, 4) + assertAcknowledges(getAlertsResponse.alerts, monitorResponse.id, 4) assertFindings(monitorResponse.id, customFindingsIndex, 4, 4, listOf("1", "2", "1", "2")) + // Verify workflow and monitor delegate metadata + val workflowMetadata1 = searchWorkflowMetadata(id = workflowId1) + assertNotNull("Workflow metadata not initialized", workflowMetadata1) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse1.workflowRunResult.executionId, + workflowMetadata1!!.latestExecutionId + ) + val monitorMetadataId1 = "${monitorResponse.id}-${workflowMetadata1.id}" + val monitorMetadata1 = searchMonitorMetadata(monitorMetadataId1) + assertNotNull(monitorMetadata1) + // Verify that for two workflows two different doc level monitor metadata has been created + assertTrue("Different monitor is used in workflows", monitorMetadata!!.monitorId == monitorMetadata1!!.monitorId) + assertTrue(monitorMetadata.id != monitorMetadata1.id) + } + + fun `test execute workflows with shared doc level monitor delegate updating delegate datasource`() { + val docQuery = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + val monitorResponse = createMonitor(monitor)!! + + val workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + assertNotNull(workflowById) + + val workflow1 = randomWorkflow( + monitorIds = listOf(monitorResponse.id) + ) + val workflowResponse1 = upsertWorkflow(workflow1)!! + val workflowById1 = searchWorkflow(workflowResponse1.id) + assertNotNull(workflowById1) + + var testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + // Matches monitor1 + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + + testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16645, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "2", testDoc2) + + val workflowId = workflowResponse.id + val executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + val monitorsRunResults = executeWorkflowResponse.workflowRunResult.workflowRunResult + assertEquals(1, monitorsRunResults.size) + + assertEquals(monitor.name, monitorsRunResults[0].monitorName) + assertEquals(1, monitorsRunResults[0].triggerResults.size) + + assertAlerts(monitorResponse.id, AlertIndices.ALERT_INDEX, 2) + assertFindings(monitorResponse.id, AlertIndices.FINDING_HISTORY_WRITE_INDEX, 2, 2, listOf("1", "2")) + // Verify workflow and monitor delegate metadata + val workflowMetadata = searchWorkflowMetadata(id = workflowId) + assertNotNull("Workflow metadata not initialized", workflowMetadata) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse.workflowRunResult.executionId, + workflowMetadata!!.latestExecutionId + ) + val monitorMetadataId = "${monitorResponse.id}-${workflowMetadata.id}" + val monitorMetadata = searchMonitorMetadata(monitorMetadataId) + assertNotNull(monitorMetadata) + + val customAlertsIndex = "custom_alerts_index" + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val monitorId = monitorResponse.id + updateMonitor( + monitor = monitor.copy( + dataSources = DataSources( + alertsIndex = customAlertsIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ), + monitorId + ) + + // Execute second workflow + val workflowId1 = workflowResponse1.id + val executeWorkflowResponse1 = executeWorkflow(workflowById1, workflowId1, false)!! + val monitorsRunResults1 = executeWorkflowResponse1.workflowRunResult.workflowRunResult + assertEquals(1, monitorsRunResults1.size) + + assertEquals(monitor.name, monitorsRunResults1[0].monitorName) + assertEquals(1, monitorsRunResults1[0].triggerResults.size) + + // Verify alerts for the custom index + val getAlertsResponse = assertAlerts(monitorResponse.id, customAlertsIndex, 2) + assertAcknowledges(getAlertsResponse.alerts, monitorResponse.id, 2) + assertFindings(monitorResponse.id, customFindingsIndex, 2, 2, listOf("1", "2")) + + // Verify workflow and monitor delegate metadata + val workflowMetadata1 = searchWorkflowMetadata(id = workflowId1) + assertNotNull("Workflow metadata not initialized", workflowMetadata1) + assertEquals( + "Workflow metadata execution id not correct", + executeWorkflowResponse1.workflowRunResult.executionId, + workflowMetadata1!!.latestExecutionId + ) + val monitorMetadataId1 = "${monitorResponse.id}-${workflowMetadata1.id}" + val monitorMetadata1 = searchMonitorMetadata(monitorMetadataId1) + assertNotNull(monitorMetadata1) + // Verify that for two workflows two different doc level monitor metadata has been created + assertTrue("Different monitor is used in workflows", monitorMetadata!!.monitorId == monitorMetadata1!!.monitorId) + assertTrue(monitorMetadata.id != monitorMetadata1.id) } fun `test execute workflow verify workflow metadata`() { @@ -248,6 +392,10 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { executeWorkflowResponse.workflowRunResult.executionId, workflowMetadata!!.latestExecutionId ) + val monitorMetadataId = "${monitorResponse.id}-${workflowMetadata.id}" + val monitorMetadata = searchMonitorMetadata(monitorMetadataId) + assertNotNull(monitorMetadata) + // Second execution val executeWorkflowResponse1 = executeWorkflow(workflowById, workflowId, false)!! val monitorsRunResults1 = executeWorkflowResponse1.workflowRunResult.workflowRunResult @@ -260,6 +408,10 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { executeWorkflowResponse1.workflowRunResult.executionId, workflowMetadata1!!.latestExecutionId ) + val monitorMetadataId1 = "${monitorResponse.id}-${workflowMetadata1.id}" + assertTrue(monitorMetadataId == monitorMetadataId1) + val monitorMetadata1 = searchMonitorMetadata(monitorMetadataId1) + assertNotNull(monitorMetadata1) } fun `test execute workflow dryrun verify workflow metadata not created`() { @@ -315,7 +467,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { assertTrue(exception is NoSuchElementException) } - fun `test execute workflow with custom alerts and finding index with bucket level doc level delegates when bucket level delegate is used in chained finding`() { + fun `test execute workflow with custom alerts and finding index when bucket monitor is used in chained finding of doc monitor`() { val query = QueryBuilders.rangeQuery("test_strict_date_time") .gt("{{period_end}}||-10d") .lte("{{period_end}}") @@ -325,7 +477,8 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { ) val compositeAgg = CompositeAggregationBuilder("composite_agg", compositeSources) val input = SearchInput(indices = listOf(index), query = SearchSourceBuilder().size(0).query(query).aggregation(compositeAgg)) - // Bucket level monitor will reduce the size of matched doc ids on those that belong to a bucket that contains more than 1 document after term grouping + // Bucket level monitor will reduce the size of matched doc ids on those that belong + // to a bucket that contains more than 1 document after term grouping val triggerScript = """ params.docCount > 1 """.trimIndent() @@ -406,11 +559,13 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { for (monitorRunResults in executeWorkflowResponse.workflowRunResult.workflowRunResult) { if (bucketLevelMonitorResponse.monitor.name == monitorRunResults.monitorName) { val searchResult = monitorRunResults.inputResults.results.first() + @Suppress("UNCHECKED_CAST") val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> assertEquals("Incorrect search result", 3, buckets.size) - assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2) + val getAlertsResponse = assertAlerts(bucketLevelMonitorResponse.id, bucketCustomAlertsIndex, 2) + assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) assertFindings(bucketLevelMonitorResponse.id, bucketCustomFindingsIndex, 1, 4, listOf("1", "2", "3", "4")) } else { assertEquals(1, monitorRunResults.inputResults.results.size) @@ -422,13 +577,14 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val expectedTriggeredDocIds = listOf("1", "2", "3", "4") assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) - assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 4) + val getAlertsResponse = assertAlerts(docLevelMonitorResponse.id, docCustomAlertsIndex, 4) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 4) assertFindings(docLevelMonitorResponse.id, docCustomFindingsIndex, 4, 4, listOf("1", "2", "3", "4")) } } } - fun `test execute workflow with custom alerts and finding index with bucket level and doc level delegates when doc level delegate is used in chained finding`() { + fun `test execute workflow with custom alerts and finding index when doc level delegate is used in chained finding`() { val docQuery1 = DocLevelQuery(query = "test_field_1:\"test_value_2\"", name = "1") val docQuery2 = DocLevelQuery(query = "test_field_1:\"test_value_3\"", name = "2") @@ -512,11 +668,17 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { """.trimIndent() val queryLevelTrigger = randomQueryLevelTrigger(condition = Script(queryTriggerScript)) - val queryMonitorResponse = createMonitor(randomQueryLevelMonitor(inputs = listOf(queryMonitorInput), triggers = listOf(queryLevelTrigger)))!! + val queryMonitorResponse = + createMonitor(randomQueryLevelMonitor(inputs = listOf(queryMonitorInput), triggers = listOf(queryLevelTrigger)))!! // 1. docMonitor (chainedFinding = null) 2. bucketMonitor (chainedFinding = docMonitor) 3. docMonitor (chainedFinding = bucketMonitor) 4. queryMonitor (chainedFinding = docMonitor 3) var workflow = randomWorkflow( - monitorIds = listOf(docLevelMonitorResponse.id, bucketLevelMonitorResponse.id, docLevelMonitorResponse1.id, queryMonitorResponse.id) + monitorIds = listOf( + docLevelMonitorResponse.id, + bucketLevelMonitorResponse.id, + docLevelMonitorResponse1.id, + queryMonitorResponse.id + ) ) val workflowResponse = upsertWorkflow(workflow)!! val workflowById = searchWorkflow(workflowResponse.id) @@ -554,18 +716,37 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val expectedTriggeredDocIds = listOf("3", "4", "5", "6") assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) - assertAlerts(docLevelMonitorResponse.id, docLevelMonitorResponse.monitor.dataSources.alertsIndex, 4) - assertFindings(docLevelMonitorResponse.id, docLevelMonitorResponse.monitor.dataSources.findingsIndex, 4, 4, listOf("3", "4", "5", "6")) + val getAlertsResponse = + assertAlerts(docLevelMonitorResponse.id, docLevelMonitorResponse.monitor.dataSources.alertsIndex, 4) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse.id, 4) + assertFindings( + docLevelMonitorResponse.id, + docLevelMonitorResponse.monitor.dataSources.findingsIndex, + 4, + 4, + listOf("3", "4", "5", "6") + ) } // Verify second bucket level monitor execution, alerts and findings bucketLevelMonitorResponse.monitor.name -> { val searchResult = monitorRunResults.inputResults.results.first() + @Suppress("UNCHECKED_CAST") - val buckets = searchResult.stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> + val buckets = + searchResult + .stringMap("aggregations")?.stringMap("composite_agg")?.get("buckets") as List> assertEquals("Incorrect search result", 2, buckets.size) - assertAlerts(bucketLevelMonitorResponse.id, bucketLevelMonitorResponse.monitor.dataSources.alertsIndex, 2) - assertFindings(bucketLevelMonitorResponse.id, bucketLevelMonitorResponse.monitor.dataSources.findingsIndex, 1, 4, listOf("3", "4", "5", "6")) + val getAlertsResponse = + assertAlerts(bucketLevelMonitorResponse.id, bucketLevelMonitorResponse.monitor.dataSources.alertsIndex, 2) + assertAcknowledges(getAlertsResponse.alerts, bucketLevelMonitorResponse.id, 2) + assertFindings( + bucketLevelMonitorResponse.id, + bucketLevelMonitorResponse.monitor.dataSources.findingsIndex, + 1, + 4, + listOf("3", "4", "5", "6") + ) } // Verify third doc level monitor execution, alerts and findings docLevelMonitorResponse1.monitor.name -> { @@ -578,8 +759,16 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val expectedTriggeredDocIds = listOf("5", "6") assertEquals(expectedTriggeredDocIds, triggeredDocIds.sorted()) - assertAlerts(docLevelMonitorResponse1.id, docLevelMonitorResponse1.monitor.dataSources.alertsIndex, 2) - assertFindings(docLevelMonitorResponse1.id, docLevelMonitorResponse1.monitor.dataSources.findingsIndex, 2, 2, listOf("5", "6")) + val getAlertsResponse = + assertAlerts(docLevelMonitorResponse1.id, docLevelMonitorResponse1.monitor.dataSources.alertsIndex, 2) + assertAcknowledges(getAlertsResponse.alerts, docLevelMonitorResponse1.id, 2) + assertFindings( + docLevelMonitorResponse1.id, + docLevelMonitorResponse1.monitor.dataSources.findingsIndex, + 2, + 2, + listOf("5", "6") + ) } // Verify fourth query level monitor execution queryMonitorResponse.monitor.name -> { @@ -587,10 +776,14 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { val values = monitorRunResults.triggerResults.values assertEquals(1, values.size) @Suppress("UNCHECKED_CAST") - val totalHits = ((monitorRunResults.inputResults.results[0]["hits"] as Map)["total"] as Map) ["value"] + val totalHits = + ((monitorRunResults.inputResults.results[0]["hits"] as Map)["total"] as Map)["value"] assertEquals(2, totalHits) @Suppress("UNCHECKED_CAST") - val docIds = ((monitorRunResults.inputResults.results[0]["hits"] as Map)["hits"] as List>).map { it["_id"]!! } + val docIds = + ( + (monitorRunResults.inputResults.results[0]["hits"] as Map)["hits"] as List> + ).map { it["_id"]!! } assertEquals(listOf("5", "6"), docIds.sorted()) } } @@ -623,7 +816,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { assertNotNull(error) assertTrue(error is AlertingException) assertEquals(RestStatus.NOT_FOUND, (error as AlertingException).status) - assertEquals("Configured indices are not found: [$index]", (error as AlertingException).message) + assertEquals("Configured indices are not found: [$index]", error.message) } fun `test execute workflow wrong workflow id`() { @@ -667,7 +860,7 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { customFindingsIndex: String, findingSize: Int, matchedQueryNumber: Int, - relatedDocIds: List + relatedDocIds: List, ) { val findings = searchFindings(monitorId, customFindingsIndex) assertEquals("Findings saved for test monitor", findingSize, findings.size) @@ -681,8 +874,8 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { private fun assertAlerts( monitorId: String, customAlertsIndex: String, - alertSize: Int - ) { + alertSize: Int, + ): GetAlertsResponse { val alerts = searchAlerts(monitorId, customAlertsIndex) assertEquals("Alert saved for test monitor", alertSize, alerts.size) val table = Table("asc", "id", null, alertSize, 0, "") @@ -700,7 +893,15 @@ class WorkflowRunnerIT : WorkflowSingleNodeTestCase() { assertTrue(getAlertsResponse != null) assertTrue(getAlertsResponse.alerts.size == alertSize) - val alertIds = getAlertsResponse.alerts.map { it.id } + return getAlertsResponse + } + + private fun assertAcknowledges( + alerts: List, + monitorId: String, + alertSize: Int, + ) { + val alertIds = alerts.map { it.id } val acknowledgeAlertResponse = client().execute( AlertingActions.ACKNOWLEDGE_ALERTS_ACTION_TYPE, AcknowledgeAlertRequest(monitorId, alertIds, WriteRequest.RefreshPolicy.IMMEDIATE) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index e7d63c8a8..03f4e1a66 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -40,7 +40,7 @@ import org.opensearch.commons.alerting.model.Table import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.index.IndexService import org.opensearch.index.query.TermQueryBuilder -import org.opensearch.index.reindex.ReindexModulePlugin +import org.opensearch.index.reindex.ReindexPlugin import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.join.ParentJoinPlugin import org.opensearch.painless.PainlessPlugin diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt index 136b08cf5..8835f2f51 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/WorkflowSingleNodeTestCase.kt @@ -10,6 +10,7 @@ import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.ExecuteWorkflowRequest import org.opensearch.alerting.action.ExecuteWorkflowResponse +import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.WorkflowMetadata import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.json.JsonXContent @@ -91,6 +92,35 @@ abstract class WorkflowSingleNodeTestCase : AlertingSingleNodeTestCase() { }.first() } + protected fun searchMonitorMetadata( + id: String, + indices: String = ScheduledJob.SCHEDULED_JOBS_INDEX, + refresh: Boolean = true, + ): MonitorMetadata? { + try { + if (refresh) refreshIndex(indices) + } catch (e: Exception) { + logger.warn("Could not refresh index $indices because: ${e.message}") + return null + } + val ssb = SearchSourceBuilder() + ssb.version(true) + ssb.query(TermQueryBuilder("_id", id)) + val searchResponse = client().prepareSearch(indices).setRouting(id).setSource(ssb).get() + + return searchResponse.hits.hits.map { it -> + val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + lateinit var monitorMetadata: MonitorMetadata + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + xcp.nextToken() + when (xcp.currentName()) { + "metadata" -> monitorMetadata = MonitorMetadata.parse(xcp) + } + } + monitorMetadata.copy(id = it.id) + }.first() + } + protected fun upsertWorkflow( workflow: Workflow, id: String = Workflow.NO_ID,