From 37987b7507dd9ab74ec1e77c49af897e0b22f2b1 Mon Sep 17 00:00:00 2001 From: Stevan Buzejic Date: Mon, 17 Apr 2023 23:01:09 +0200 Subject: [PATCH] =?UTF-8?q?Refactored=20workflowIndexing=20validation=20-?= =?UTF-8?q?=20removed=20coroutine=20and=20contex=E2=80=A6=20(#857)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Refactored workflowIndexing validation - removed coroutine and context client context lost Signed-off-by: Stevan Buzejic * refactored getting the workflows Signed-off-by: Stevan Buzejic * Changed the logic according to secure test findings Signed-off-by: Stevan Buzejic * [Backport 2.x] Notification security fix (#861) (#863) * Notification security fix (#852) * added injecting whole user object in threadContext before calling notification APIs so that backend roles are available to notification plugin * compile fix * refactored user_info injection to use InjectSecurity * ktlint fix --------- (cherry picked from commit e0b7a5a7905b977e58d80e3b9134b14893d122b0) * remove unneeded import --------- Signed-off-by: Ashish Agrawal Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: Petar Dzepina Co-authored-by: Ashish Agrawal * Stashed user together with it's roles Signed-off-by: Stevan Buzejic --------- Signed-off-by: Stevan Buzejic Signed-off-by: Ashish Agrawal Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Co-authored-by: Petar Dzepina Co-authored-by: Ashish Agrawal --- .../TransportDeleteWorkflowAction.kt | 22 +- .../transport/TransportGetWorkflowAction.kt | 16 +- .../transport/TransportIndexWorkflowAction.kt | 331 +++++++++++------- 3 files changed, 238 insertions(+), 131 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index ccedfb17b..fb7ec9664 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -22,8 +22,10 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client @@ -53,6 +55,7 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService +import java.util.UUID private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) /** @@ -64,7 +67,7 @@ class TransportDeleteWorkflowAction @Inject constructor( val client: Client, actionFilters: ActionFilters, val clusterService: ClusterService, - settings: Settings, + val settings: Settings, val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest @@ -149,7 +152,22 @@ class TransportDeleteWorkflowAction @Inject constructor( val deleteResponse = deleteWorkflow(workflow) if (deleteDelegateMonitors == true) { - deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + if (user == null) { + deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + } else { + // Un-stash the context + withClosableContext( + InjectorContextElement( + user.name.plus(UUID.randomUUID().toString()), + settings, + client.threadPool().threadContext, + user.roles, + user + ) + ) { + deleteMonitors(delegateMonitorIds, RefreshPolicy.IMMEDIATE) + } + } } actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version)) } else { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt index e09f1a91a..68df71e4a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetWorkflowAction.kt @@ -85,7 +85,21 @@ class TransportGetWorkflowAction @Inject constructor( xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.sourceAsBytesRef, XContentType.JSON ).use { xcp -> - workflow = ScheduledJob.parse(xcp, response.id, response.version) as Workflow + val compositeMonitor = ScheduledJob.parse(xcp, response.id, response.version) + if (compositeMonitor is Workflow) { + workflow = compositeMonitor + } else { + log.error("Wrong monitor type returned") + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "Workflow not found.", + RestStatus.NOT_FOUND + ) + ) + ) + return + } // security is enabled and filterby is enabled if (!checkUserPermissionsWithResource( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index bc8f59b15..5fdd4490a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -1,3 +1,8 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + package org.opensearch.alerting.transport import kotlinx.coroutines.CoroutineScope @@ -24,8 +29,10 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_MAX_MONITORS import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT @@ -34,6 +41,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEO import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils +import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.util.isQueryLevelMonitor import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService @@ -66,6 +74,7 @@ import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService +import java.util.UUID import java.util.stream.Collectors private val log = LogManager.getLogger(TransportIndexWorkflowAction::class.java) @@ -131,8 +140,7 @@ class TransportIndexWorkflowAction @Inject constructor( ) { if (transformedRequest.rbacRoles?.stream()?.anyMatch { !user.backendRoles.contains(it) } == true) { log.error( - "User specified backend roles, ${transformedRequest.rbacRoles}, " + - "that they don' have access to. User backend roles: ${user.backendRoles}" + "User specified backend roles, ${transformedRequest.rbacRoles}, that they don' have access to. User backend roles: ${user.backendRoles}" ) actionListener.onFailure( AlertingException.wrap( @@ -145,8 +153,7 @@ class TransportIndexWorkflowAction @Inject constructor( return } else if (transformedRequest.rbacRoles?.isEmpty() == true) { log.error( - "Non-admin user are not allowed to specify an empty set of backend roles. " + - "Please don't pass in the parameter or pass in at least one backend role." + "Non-admin user are not allowed to specify an empty set of backend roles. Please don't pass in the parameter or pass in at least one backend role." ) actionListener.onFailure( AlertingException.wrap( @@ -159,14 +166,27 @@ class TransportIndexWorkflowAction @Inject constructor( return } } + scope.launch { try { - validateRequest(client, transformedRequest, user) + validateMonitorAccess( + transformedRequest, + user, + client, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + // Stash the context and start the workflow creation + client.threadPool().threadContext.stashContext().use { + IndexWorkflowHandler(client, actionListener, transformedRequest, user).resolveUserAndStart() + } + } - // If the validation was successful - continue with the execution - client.threadPool().threadContext.stashContext().use { - IndexWorkflowHandler(client, actionListener, transformedRequest, user).resolveUserAndStart() - } + override fun onFailure(e: Exception) { + log.error("Error indexing workflow", e) + actionListener.onFailure(e) + } + } + ) } catch (e: Exception) { log.error("Failed to create workflow", e) if (e is IndexNotFoundException) { @@ -183,45 +203,11 @@ class TransportIndexWorkflowAction @Inject constructor( } } - /** - * Validates the request in several steps - * Checks if the user has appropriate backend roles and if he can access the given monitors and it's indices - */ - private suspend fun validateRequest(client: Client, request: IndexWorkflowRequest, user: User?) { - if (request.workflow.inputs.isEmpty()) - throw AlertingException.wrap(IllegalArgumentException("Input list can not be empty.")) - - if (request.workflow.inputs.size > 1) - throw AlertingException.wrap(IllegalArgumentException("Input list can contain only one element.")) - - if (request.workflow.inputs[0] !is CompositeInput) - throw AlertingException.wrap(IllegalArgumentException("When creating a workflow input must be CompositeInput")) - - val compositeInput = request.workflow.inputs[0] as CompositeInput - val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) - - if (monitorIds.isNullOrEmpty()) - throw AlertingException.wrap(IllegalArgumentException("Delegates list can not be empty.")) - - validateDuplicateDelegateMonitorReferenceExists(monitorIds) - validateSequenceOrdering(compositeInput.sequence.delegates) - validateChainedMonitorFindings(compositeInput.sequence.delegates) - - val monitors = getDelegateMonitors(user, monitorIds) - - validateDelegateMonitorsExist(monitorIds, monitors) - validateChainedMonitorFindingsMonitors(compositeInput.sequence.delegates, monitors) - - val indices = getMonitorIndices(monitors) - - validateIndicesAccess(indices, client) - } - inner class IndexWorkflowHandler( private val client: Client, private val actionListener: ActionListener, private val request: IndexWorkflowRequest, - private val user: User? + private val user: User?, ) { fun resolveUserAndStart() { scope.launch { @@ -318,7 +304,8 @@ class TransportIndexWorkflowAction @Inject constructor( actionListener.onFailure( AlertingException.wrap( OpenSearchStatusException( - "Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged", RestStatus.INTERNAL_SERVER_ERROR + "Create $SCHEDULED_JOBS_INDEX mappings call not acknowledged", + RestStatus.INTERNAL_SERVER_ERROR ) ) ) @@ -357,7 +344,12 @@ class TransportIndexWorkflowAction @Inject constructor( val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) - .source(request.workflow.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .source( + request.workflow.toXContentWithUser( + jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) .setIfSeqNo(request.seqNo) .setIfPrimaryTerm(request.primaryTerm) .timeout(indexTimeout) @@ -368,7 +360,12 @@ class TransportIndexWorkflowAction @Inject constructor( if (failureReasons != null) { log.error("Failed to create workflow: $failureReasons") actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + AlertingException.wrap( + OpenSearchStatusException( + failureReasons.toString(), + indexResponse.status() + ) + ) ) return } @@ -391,7 +388,10 @@ class TransportIndexWorkflowAction @Inject constructor( if (!getResponse.isExists) { actionListener.onFailure( AlertingException.wrap( - OpenSearchStatusException("Workflow with ${request.workflowId} is not found", RestStatus.NOT_FOUND) + OpenSearchStatusException( + "Workflow with ${request.workflowId} is not found", + RestStatus.NOT_FOUND + ) ) ) return @@ -408,7 +408,14 @@ class TransportIndexWorkflowAction @Inject constructor( } private suspend fun onGetResponse(currentWorkflow: Workflow) { - if (!checkUserPermissionsWithResource(user, currentWorkflow.user, actionListener, "workfklow", request.workflowId)) { + if (!checkUserPermissionsWithResource( + user, + currentWorkflow.user, + actionListener, + "workfklow", + request.workflowId + ) + ) { return } @@ -441,14 +448,22 @@ class TransportIndexWorkflowAction @Inject constructor( // rolesToRemove: these are the backend roles to remove from the monitor val rolesToRemove = user.backendRoles - request.rbacRoles.orEmpty() // remove the monitor's roles with rolesToRemove and add any roles passed into the request.rbacRoles - val updatedRbac = currentWorkflow.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty() + val updatedRbac = + currentWorkflow.user?.backendRoles.orEmpty() - rolesToRemove + request.rbacRoles.orEmpty() request.workflow = request.workflow.copy( user = User(user.name, updatedRbac, user.roles, user.customAttNames) ) } } else { request.workflow = request.workflow - .copy(user = User(user.name, currentWorkflow.user!!.backendRoles, user.roles, user.customAttNames)) + .copy( + user = User( + user.name, + currentWorkflow.user!!.backendRoles, + user.roles, + user.customAttNames + ) + ) } log.debug("Update workflow backend roles to: ${request.workflow.user?.backendRoles}") } @@ -456,7 +471,12 @@ class TransportIndexWorkflowAction @Inject constructor( request.workflow = request.workflow.copy(schemaVersion = IndexUtils.scheduledJobIndexSchemaVersion) val indexRequest = IndexRequest(SCHEDULED_JOBS_INDEX) .setRefreshPolicy(request.refreshPolicy) - .source(request.workflow.toXContentWithUser(jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) + .source( + request.workflow.toXContentWithUser( + jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) .id(request.workflowId) .setIfSeqNo(request.seqNo) .setIfPrimaryTerm(request.primaryTerm) @@ -467,7 +487,12 @@ class TransportIndexWorkflowAction @Inject constructor( val failureReasons = checkShardsFailure(indexResponse) if (failureReasons != null) { actionListener.onFailure( - AlertingException.wrap(OpenSearchStatusException(failureReasons.toString(), indexResponse.status())) + AlertingException.wrap( + OpenSearchStatusException( + failureReasons.toString(), + indexResponse.status() + ) + ) ) return } @@ -494,55 +519,58 @@ class TransportIndexWorkflowAction @Inject constructor( } } - private fun validateChainedMonitorFindings(delegates: List) { - val monitorIdOrderMap: Map = delegates.associate { it.monitorId to it.order } - delegates.forEach { - if (it.chainedMonitorFindings != null) { - if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings!!.monitorId) == false) { - throw AlertingException.wrap( - IllegalArgumentException( - "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} doesn't exist in sequence" - ) - ) - } - if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings!!.monitorId]!!) { - throw AlertingException.wrap( - IllegalArgumentException( - "Chained Findings Monitor ${it.chainedMonitorFindings!!.monitorId} should be executed before monitor ${it.monitorId}" - ) - ) - } - } - } - } - private fun validateChainedMonitorFindingsMonitors(delegates: List, monitorDelegates: List) { + infix fun List.equalsIgnoreOrder(other: List) = + this.size == other.size && this.toSet() == other.toSet() + val monitorsById = monitorDelegates.associateBy { it.id } delegates.forEach { + val delegateMonitor = monitorsById[it.monitorId] ?: throw AlertingException.wrap( + IllegalArgumentException("Delegate monitor ${it.monitorId} doesn't exist") + ) if (it.chainedMonitorFindings != null) { - val chainedFindingMonitor = monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap( - IllegalArgumentException("Chained finding monitor doesn't exist") - ) + val chainedFindingMonitor = + monitorsById[it.chainedMonitorFindings!!.monitorId] ?: throw AlertingException.wrap( + IllegalArgumentException("Chained finding monitor doesn't exist") + ) if (chainedFindingMonitor.isQueryLevelMonitor()) { throw AlertingException.wrap(IllegalArgumentException("Query level monitor can't be part of chained findings")) } + + val delegateMonitorIndices = getMonitorIndices(delegateMonitor) + + val chainedMonitorIndices = getMonitorIndices(chainedFindingMonitor) + + if (!delegateMonitorIndices.equalsIgnoreOrder(chainedMonitorIndices)) { + throw AlertingException.wrap(IllegalArgumentException("Delegate monitor and it's chained finding monitor must query the same indices")) + } } } } - private fun validateSequenceOrdering(delegates: List) { - val orderSet = delegates.stream().filter { it.order > 0 }.map { it.order }.collect(Collectors.toSet()) - if (orderSet.size != delegates.size) { - throw AlertingException.wrap(IllegalArgumentException("Sequence ordering of delegate monitor shouldn't contain duplicate order values")) - } - } + /** + * Returns list of indices for the given monitor depending on it's type + */ + private fun getMonitorIndices(monitor: Monitor): List { + return when (monitor.monitorType) { + Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices + Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices } + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { + if (isADMonitor(monitor)) monitor.inputs.flatMap { s -> (s as SearchInput).indices } + else { + val indices = mutableListOf() + for (input in monitor.inputs) { + when (input) { + is SearchInput -> indices.addAll(input.indices) + else -> indices + } + } + indices + } + } - private fun validateDuplicateDelegateMonitorReferenceExists( - monitorIds: MutableList - ) { - if (monitorIds.toSet().size != monitorIds.size) { - throw AlertingException.wrap(IllegalArgumentException("Duplicate delegates not allowed")) + else -> emptyList() } } @@ -560,30 +588,28 @@ class TransportIndexWorkflowAction @Inject constructor( } /** - * Returns monitors for the given ids if user has an access + * Validates monitor and indices access + * 1. Validates the monitor access (if the filterByEnabled is set to true - adds backend role filter) as admin + * 2. Unstashes the context and checks if the user can access the monitor indices */ - private suspend fun getDelegateMonitors( + private suspend fun validateMonitorAccess( + request: IndexWorkflowRequest, user: User?, - monitorIds: MutableList - ): List { + client: Client, + actionListener: ActionListener + ) { + val compositeInput = request.workflow.inputs[0] as CompositeInput + val monitorIds = compositeInput.sequence.delegates.stream().map { it.monitorId }.collect(Collectors.toList()) val query = QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("_id", monitorIds)) val searchSource = SearchSourceBuilder().query(query) - val monitorSearchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) - // TODO - Add secure tests once the Rest Action is created - if (user != null && filterByEnabled) { - addFilter(user, monitorSearchRequest.source(), "monitor.user.backend_roles.keyword") + val searchRequest = SearchRequest(SCHEDULED_JOBS_INDEX).source(searchSource) + + if (user != null && !isAdmin(user) && filterByEnabled) { + addFilter(user, searchRequest.source(), "monitor.user.backend_roles.keyword") } - val searchMonitorResponse: SearchResponse = client.suspendUntil { client.search(monitorSearchRequest, it) } + val searchMonitorResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } - if (searchMonitorResponse.status() != RestStatus.OK) { - throw AlertingException.wrap( - OpenSearchStatusException( - "User doesn't have read permissions for one or more configured monitors ${monitorIds.joinToString()}", - RestStatus.FORBIDDEN - ) - ) - } if (searchMonitorResponse.isTimedOut) { throw OpenSearchException("Cannot determine that the $SCHEDULED_JOBS_INDEX index is healthy") } @@ -597,7 +623,77 @@ class TransportIndexWorkflowAction @Inject constructor( monitors.add(monitor) } } - return monitors + if (monitors.isEmpty()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "User doesn't have read permissions for one or more configured monitors ${monitorIds.joinToString()}", + RestStatus.FORBIDDEN + ) + ) + ) + return + } + // Validate delegates and it's chained findings + try { + validateDelegateMonitorsExist(monitorIds, monitors) + validateChainedMonitorFindingsMonitors(compositeInput.sequence.delegates, monitors) + } catch (e: Exception) { + actionListener.onFailure(e) + return + } + val indices = getMonitorIndices(monitors) + + val indicesSearchRequest = SearchRequest().indices(*indices.toTypedArray()) + .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) + + if (user == null) { + checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) + } else { + // Unstash the context and check if user with specified roles has indices access + withClosableContext( + InjectorContextElement( + user.name.plus(UUID.randomUUID().toString()), + settings, + client.threadPool().threadContext, + user.roles, + user + ) + ) { + checkIndicesAccess(client, indicesSearchRequest, indices, actionListener) + } + } + } + + /** + * Checks if the client can access the given indices + */ + private fun checkIndicesAccess( + client: Client, + indicesSearchRequest: SearchRequest?, + indices: MutableList, + actionListener: ActionListener, + ) { + client.search( + indicesSearchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse?) { + actionListener.onResponse(AcknowledgedResponse(true)) + } + + override fun onFailure(e: Exception) { + log.error("Error accessing the monitor indices", e) + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException( + "User doesn't have read permissions for one or more configured index ${indices.joinToString()}", + RestStatus.FORBIDDEN + ) + ) + ) + } + } + ) } /** @@ -615,25 +711,4 @@ class TransportIndexWorkflowAction @Inject constructor( } return indices } - - /** - * Checks if the user can access the monitor indices - */ - private suspend fun validateIndicesAccess( - indices: MutableList, - client: Client, - ) { - val indicesSearchRequest = SearchRequest().indices(*indices.toTypedArray()) - .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) - - val indicesSearchResponse: SearchResponse = client.suspendUntil { client.search(indicesSearchRequest, it) } - if (indicesSearchResponse.status() != RestStatus.OK) { - throw AlertingException.wrap( - OpenSearchStatusException( - "User doesn't have read permissions for one or more configured index ${indices.joinToString()}", - RestStatus.FORBIDDEN - ) - ) - } - } }