diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index a21efdd4a..191e81016 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -403,7 +403,7 @@ object ManagedIndexRunner : if (executedManagedIndexMetaData.isFailed) { try { // if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message - // publishErrorNotification(policy, executedManagedIndexMetaData) + publishErrorNotification(policy, executedManagedIndexMetaData) } catch (e: Exception) { logger.error("Failed to publish error notification", e) val errorMessage = e.message ?: "Failed to publish error notification" @@ -783,7 +783,7 @@ object ManagedIndexRunner : errorNotificationRetryPolicy.retry(logger) { val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData) destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client) - channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage) + channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt index f531cbe90..22953bc61 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -30,7 +30,7 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam try { val compiledMessage = compileTemplate(scriptService, action.messageTemplate, context.metadata) action.destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(context.client) - action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage) + action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage, context.user) // publish and send throws an error for any invalid responses so its safe to assume if we reach this point it was successful stepStatus = StepStatus.COMPLETED info = mapOf("message" to getSuccessMessage(indexName)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt index db65accf0..b577579f8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt @@ -9,6 +9,8 @@ package org.opensearch.indexmanagement.indexstatemanagement.util import org.opensearch.OpenSearchStatusException import org.opensearch.client.Client import org.opensearch.client.node.NodeClient +import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT +import org.opensearch.commons.authuser.User import org.opensearch.commons.destination.message.LegacyBaseMessage import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest @@ -45,18 +47,37 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) { /** * Extension function for publishing a notification to a channel in the Notification plugin. */ -suspend fun Channel.sendNotification(client: Client, title: String, managedIndexMetaData: ManagedIndexMetaData, compiledMessage: String) { +suspend fun Channel.sendNotification( + client: Client, + title: String, + managedIndexMetaData: ManagedIndexMetaData, + compiledMessage: String, + user: User? +) { val channel = this - val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - managedIndexMetaData.getEventSource(title), - ChannelMessage(compiledMessage, null, null), - listOf(channel.id), - it - ) + client.threadPool().threadContext.stashContext().use { + // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object + client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user)) + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + managedIndexMetaData.getEventSource(title), + ChannelMessage(compiledMessage, null, null), + listOf(channel.id), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) } - validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) +} + +private fun generateUserString(user: User?): String { + if (user == null) return "" + val backendRoles = user.backendRoles.joinToString(",") + val roles = user.roles.joinToString(",") + val requestedTenant = user.requestedTenant + val userName = user.name + return "$userName|$backendRoles|$roles|$requestedTenant" } fun ManagedIndexMetaData.getEventSource(title: String): EventSource {