Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BugFix: Notification integration issues #339

Merged
merged 2 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ dependencies {
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "commons-codec:commons-codec:${versions.commonscodec}"
implementation "org.apache.httpcomponents:httpclient:4.5.13"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why these are added along with this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems these are needed for the sending notification to legacy custom_webhook endpoints, the libraries are no longer in class path - I am not entirely sure where this was removed. Since IM needs it bundling it as part of the zip.

The PR is meant to fix bugs with notification plugin integration and since this is one of the bugs fixed it - I added a line in overview for this. However, I think I missed the renaming of PR - I updated it now

implementation "org.apache.httpcomponents:httpcore:4.4.12"

testImplementation "org.opensearch.test:framework:${opensearch_version}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ object ManagedIndexRunner :
if (executedManagedIndexMetaData.isFailed) {
try {
// if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message
// publishErrorNotification(policy, executedManagedIndexMetaData)
publishErrorNotification(policy, executedManagedIndexMetaData)
} catch (e: Exception) {
logger.error("Failed to publish error notification", e)
val errorMessage = e.message ?: "Failed to publish error notification"
Expand Down Expand Up @@ -783,7 +783,7 @@ object ManagedIndexRunner :
errorNotificationRetryPolicy.retry(logger) {
val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class AttemptNotificationStep(private val action: NotificationAction) : Step(nam
try {
val compiledMessage = compileTemplate(scriptService, action.messageTemplate, context.metadata)
action.destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(context.client)
action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage)
action.channel?.sendNotification(context.client, CHANNEL_TITLE, context.metadata, compiledMessage, context.user)
// publish and send throws an error for any invalid responses so its safe to assume if we reach this point it was successful
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package org.opensearch.indexmanagement.indexstatemanagement.util
import org.opensearch.OpenSearchStatusException
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT
import org.opensearch.commons.authuser.User
import org.opensearch.commons.destination.message.LegacyBaseMessage
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.LegacyPublishNotificationRequest
Expand Down Expand Up @@ -45,18 +47,37 @@ suspend fun LegacyBaseMessage.publishLegacyNotification(client: Client) {
/**
* Extension function for publishing a notification to a channel in the Notification plugin.
*/
suspend fun Channel.sendNotification(client: Client, title: String, managedIndexMetaData: ManagedIndexMetaData, compiledMessage: String) {
suspend fun Channel.sendNotification(
client: Client,
title: String,
managedIndexMetaData: ManagedIndexMetaData,
compiledMessage: String,
user: User?
) {
val channel = this
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
managedIndexMetaData.getEventSource(title),
ChannelMessage(compiledMessage, null, null),
listOf(channel.id),
it
)
client.threadPool().threadContext.stashContext().use {
// We need to set the user context information in the thread context for notification plugin to correctly resolve the user object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we setting the user object twice in the threadContext? Here and above the calling stack in the runner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the stash context will remove all the set variables but yes, we are setting it twice but in different format and in a different location in thread context

It seems notification plugin has special requirement for internal transport action calls than rest of the cluster

client.threadPool().threadContext.putTransient(OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, generateUserString(user))
val res: SendNotificationResponse = NotificationsPluginInterface.suspendUntil {
this.sendNotification(
(client as NodeClient),
managedIndexMetaData.getEventSource(title),
ChannelMessage(compiledMessage, null, null),
listOf(channel.id),
it
)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}
validateResponseStatus(res.getStatus(), res.notificationEvent.eventSource.referenceId)
}

private fun generateUserString(user: User?): String {
if (user == null) return ""
val backendRoles = user.backendRoles.joinToString(",")
val roles = user.roles.joinToString(",")
val requestedTenant = user.requestedTenant
val userName = user.name
return "$userName|$backendRoles|$roles|$requestedTenant"
}

fun ManagedIndexMetaData.getEventSource(title: String): EventSource {
Expand Down