diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index 170bddb62..ce1d855c3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -24,6 +24,7 @@ import org.opensearch.alerting.util.destinationmigration.publishLegacyNotificati import org.opensearch.alerting.util.destinationmigration.sendNotification import org.opensearch.alerting.util.isAllowed import org.opensearch.alerting.util.isTestAction +import org.opensearch.alerting.util.use import org.opensearch.client.node.NodeClient import org.opensearch.common.Strings import org.opensearch.commons.alerting.model.Monitor @@ -62,16 +63,24 @@ abstract class MonitorRunner { throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}") } if (!dryrun) { - val roles = MonitorRunnerService.getRolesForMonitor(monitor) - withClosableContext( - InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles) - ) { - actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( - action, - monitorCtx, - actionOutput[Action.SUBJECT], - actionOutput[Action.MESSAGE]!! - ) + val client = monitorCtx.client + client!!.threadPool().threadContext.stashContext().use { + withClosableContext( + InjectorContextElement( + monitor.id, + monitorCtx.settings!!, + monitorCtx.threadPool!!.threadContext, + monitor.user?.roles, + monitor.user + ) + ) { + actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification( + action, + monitorCtx, + actionOutput[Action.SUBJECT], + actionOutput[Action.MESSAGE]!! + ) + } } } ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null) diff --git a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index 019d484f1..4c79d4e4f 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -18,7 +18,6 @@ import org.opensearch.action.search.ShardSearchFailure import org.opensearch.client.OpenSearchClient import org.opensearch.common.settings.Settings import org.opensearch.common.util.concurrent.ThreadContext -import org.opensearch.common.util.concurrent.ThreadContext.StoredContext import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity @@ -171,28 +170,13 @@ suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPl }) } -/** - * Store a [ThreadContext] and restore a [ThreadContext] when the coroutine resumes on a different thread. - * - * @param threadContext - a [ThreadContext] instance - */ -class ElasticThreadContextElement(private val threadContext: ThreadContext) : ThreadContextElement { - - companion object Key : CoroutineContext.Key - private var context: StoredContext = threadContext.newStoredContext(true) - - override val key: CoroutineContext.Key<*> - get() = Key - - override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) { - this.context = threadContext.stashContext() - } - - override fun updateThreadContext(context: CoroutineContext) = this.context.close() -} - -class InjectorContextElement(id: String, settings: Settings, threadContext: ThreadContext, private val roles: List?) : - ThreadContextElement { +class InjectorContextElement( + id: String, + settings: Settings, + threadContext: ThreadContext, + private val roles: List?, + private val user: User? = null +) : ThreadContextElement { companion object Key : CoroutineContext.Key override val key: CoroutineContext.Key<*> @@ -202,6 +186,8 @@ class InjectorContextElement(id: String, settings: Settings, threadContext: Thre override fun updateThreadContext(context: CoroutineContext) { rolesInjectorHelper.injectRoles(roles) + // This is from where plugins extract backend roles. It should be passed when calling APIs of other plugins + rolesInjectorHelper.injectUserInfo(user) } override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {