Skip to content

Commit

Permalink
BugFix: Notification integration issues (opensearch-project#339)
Browse files Browse the repository at this point in the history
Signed-off-by: Ravi Thaluru <[email protected]>
  • Loading branch information
thalurur authored Apr 19, 2022
1 parent 846e0a1 commit 39368bb
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 13 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ dependencies {
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "commons-codec:commons-codec:${versions.commonscodec}"
implementation "org.apache.httpcomponents:httpclient:4.5.13"
implementation "org.apache.httpcomponents:httpcore:4.4.12"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ object ManagedIndexRunner :
if (executedManagedIndexMetaData.isFailed) {
try {
// if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message
// publishErrorNotification(policy, executedManagedIndexMetaData)
publishErrorNotification(policy, executedManagedIndexMetaData)
} catch (e: Exception) {
logger.error("Failed to publish error notification", e)
val errorMessage = e.message ?: "Failed to publish error notification"
Expand Down Expand Up @@ -783,7 +783,7 @@ object ManagedIndexRunner :
errorNotificationRetryPolicy.retry(logger) {
val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam
try {
val compiledMessage = compileTemplate(scriptService, action.messageTemplate, context.metadata)
action.destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(context.client)
action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage)
action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage, context.user)
// publish and send throws an error for any invalid responses so its safe to assume if we reach this point it was successful
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package org.opensearch.indexmanagement.indexstatemanagement.util
import org.opensearch.OpenSearchStatusException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
import org.opensearch.commons.authuser.User
import org.opensearch.commons.destination.message.LegacyBaseMessage
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest
Expand Down Expand Up @@ -45,18 +47,37 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) {
/**
* Extension function for publishing a notification to a channel in the Notification plugin.
*/
suspend fun Channel.sendNotification(client: Client, title: String, managedIndexMetaData: ManagedIndexMetaData, compiledMessage: String) {
suspend fun Channel.sendNotification(
client: Client,
title: String,
managedIndexMetaData: ManagedIndexMetaData,
compiledMessage: String,
user: User?
) {
val channel = this
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
managedIndexMetaData.getEventSource(title),
ChannelMessage(compiledMessage, null, null),
listOf(channel.id),
it
)
client.threadPool().threadContext.stashContext().use {
// We need to set the user context information in the thread context for notification plugin to correctly resolve the user object
client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user))
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
managedIndexMetaData.getEventSource(title),
ChannelMessage(compiledMessage, null, null),
listOf(channel.id),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}

private fun generateUserString(user: User?): String {
if (user == null) return ""
val backendRoles = user.backendRoles.joinToString(",")
val roles = user.roles.joinToString(",")
val requestedTenant = user.requestedTenant
val userName = user.name
return "$userName|$backendRoles|$roles|$requestedTenant"
}

fun ManagedIndexMetaData.getEventSource(title: String): EventSource {
Expand Down

0 comments on commit 39368bb

Please sign in to comment.