diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index a21efdd4a..191e81016 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -403,7 +403,7 @@ object ManagedIndexRunner : if (executedManagedIndexMetaData.isFailed) { try { // if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message - // publishErrorNotification(policy, executedManagedIndexMetaData) + publishErrorNotification(policy, executedManagedIndexMetaData) } catch (e: Exception) { logger.error("Failed to publish error notification", e) val errorMessage = e.message ?: "Failed to publish error notification" @@ -783,7 +783,7 @@ object ManagedIndexRunner : errorNotificationRetryPolicy.retry(logger) { val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData) destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client) - channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage) + channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt index db65accf0..228d76b05 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt @@ -9,6 +9,8 @@ package org.opensearch.indexmanagement.indexstatemanagement.util import org.opensearch.OpenSearchStatusException import org.opensearch.client.Client import org.opensearch.client.node.NodeClient +import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT +import org.opensearch.commons.authuser.User import org.opensearch.commons.destination.message.LegacyBaseMessage import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest @@ -45,18 +47,28 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) { /** * Extension function for publishing a notification to a channel in the Notification plugin. */ -suspend fun Channel.sendNotification(client: Client, title: String, managedIndexMetaData: ManagedIndexMetaData, compiledMessage: String) { +suspend fun Channel.sendNotification( + client: Client, + title: String, + managedIndexMetaData: ManagedIndexMetaData, + compiledMessage: String, + user: User? +) { val channel = this - val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - managedIndexMetaData.getEventSource(title), - ChannelMessage(compiledMessage, null, null), - listOf(channel.id), - it - ) + client.threadPool().threadContext.stashContext().use { + // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object + client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, user?.toString()) + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + managedIndexMetaData.getEventSource(title), + ChannelMessage(compiledMessage, null, null), + listOf(channel.id), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) } - validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) } fun ManagedIndexMetaData.getEventSource(title: String): EventSource {