From 1d1d62ee49fb70dda7348598a2972f2a22d3b648 Mon Sep 17 00:00:00 2001 From: Ravi <6005951+thalurur@users.noreply.github.com> Date: Tue, 19 Apr 2022 11:15:07 -0700 Subject: [PATCH] BugFix: Notification integration issues (#339) Signed-off-by: Ravi Thaluru --- build.gradle | 2 + .../ManagedIndexRunner.kt | 4 +- .../notification/AttemptNotificationStep.kt | 2 +- .../util/NotificationUtils.kt | 41 ++++++++++++++----- 4 files changed, 36 insertions(+), 13 deletions(-) diff --git a/build.gradle b/build.gradle index 6e36f5cf3..c2cdb3c6f 100644 --- a/build.gradle +++ b/build.gradle @@ -174,6 +174,8 @@ dependencies { implementation "org.opensearch:common-utils:${common_utils_version}" implementation "com.github.seancfoley:ipaddress:5.3.3" implementation "commons-codec:commons-codec:${versions.commonscodec}" + implementation "org.apache.httpcomponents:httpclient:4.5.13" + implementation "org.apache.httpcomponents:httpcore:4.4.12" testImplementation "org.opensearch.test:framework:${opensearch_version}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index a21efdd4a..191e81016 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -403,7 +403,7 @@ object ManagedIndexRunner : if (executedManagedIndexMetaData.isFailed) { try { // if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message - // publishErrorNotification(policy, executedManagedIndexMetaData) + publishErrorNotification(policy, executedManagedIndexMetaData) } catch (e: Exception) { logger.error("Failed to publish error notification", e) val errorMessage = e.message ?: "Failed to publish error notification" @@ -783,7 +783,7 @@ object ManagedIndexRunner : errorNotificationRetryPolicy.retry(logger) { val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData) destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client) - channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage) + channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt index f531cbe90..22953bc61 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/notification/AttemptNotificationStep.kt @@ -30,7 +30,7 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam try { val compiledMessage = compileTemplate(scriptService, action.messageTemplate, context.metadata) action.destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(context.client) - action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage) + action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage, context.user) // publish and send throws an error for any invalid responses so its safe to assume if we reach this point it was successful stepStatus = StepStatus.COMPLETED info = mapOf("message" to getSuccessMessage(indexName)) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt index db65accf0..b577579f8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/util/NotificationUtils.kt @@ -9,6 +9,8 @@ package org.opensearch.indexmanagement.indexstatemanagement.util import org.opensearch.OpenSearchStatusException import org.opensearch.client.Client import org.opensearch.client.node.NodeClient +import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT +import org.opensearch.commons.authuser.User import org.opensearch.commons.destination.message.LegacyBaseMessage import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest @@ -45,18 +47,37 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) { /** * Extension function for publishing a notification to a channel in the Notification plugin. */ -suspend fun Channel.sendNotification(client: Client, title: String, managedIndexMetaData: ManagedIndexMetaData, compiledMessage: String) { +suspend fun Channel.sendNotification( + client: Client, + title: String, + managedIndexMetaData: ManagedIndexMetaData, + compiledMessage: String, + user: User? +) { val channel = this - val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { - this.sendNotification( - (client as NodeClient), - managedIndexMetaData.getEventSource(title), - ChannelMessage(compiledMessage, null, null), - listOf(channel.id), - it - ) + client.threadPool().threadContext.stashContext().use { + // We need to set the user context information in the thread context for notification plugin to correctly resolve the user object + client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user)) + val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + (client as NodeClient), + managedIndexMetaData.getEventSource(title), + ChannelMessage(compiledMessage, null, null), + listOf(channel.id), + it + ) + } + validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) } - validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId) +} + +private fun generateUserString(user: User?): String { + if (user == null) return "" + val backendRoles = user.backendRoles.joinToString(",") + val roles = user.roles.joinToString(",") + val requestedTenant = user.requestedTenant + val userName = user.name + return "$userName|$backendRoles|$roles|$requestedTenant" } fun ManagedIndexMetaData.getEventSource(title: String): EventSource {