Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notification security fix #852

Merged
merged 4 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.util.destinationmigration.publishLegacyNotificati
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isTestAction
import org.opensearch.alerting.util.use
import org.opensearch.client.node.NodeClient
import org.opensearch.common.Strings
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -62,16 +63,24 @@ abstract class MonitorRunner {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
withClosableContext(
InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)
) {
actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification(
action,
monitorCtx,
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!
)
val client = monitorCtx.client
client!!.threadPool().threadContext.stashContext().use {
withClosableContext(
InjectorContextElement(
monitor.id,
monitorCtx.settings!!,
monitorCtx.threadPool!!.threadContext,
monitor.user?.roles,
monitor.user
)
) {
actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification(
action,
monitorCtx,
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!
)
}
}
}
ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package org.opensearch.alerting.opensearchapi

import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ThreadContextElement
import kotlinx.coroutines.delay
Expand All @@ -18,7 +22,6 @@ import org.opensearch.action.search.ShardSearchFailure
import org.opensearch.client.OpenSearchClient
import org.opensearch.common.settings.Settings
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.common.util.concurrent.ThreadContext.StoredContext
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.InjectSecurity
Expand All @@ -32,10 +35,6 @@ import org.opensearch.rest.RestStatus.BAD_GATEWAY
import org.opensearch.rest.RestStatus.GATEWAY_TIMEOUT
import org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE
import org.opensearch.search.builder.SearchSourceBuilder
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine

/** Convert an object to maps and lists representation */
fun ToXContent.convertToMap(): Map<String, Any> {
Expand Down Expand Up @@ -171,28 +170,13 @@ suspend fun <T> NotificationsPluginInterface.suspendUntil(block: NotificationsPl
})
}

/**
* Store a [ThreadContext] and restore a [ThreadContext] when the coroutine resumes on a different thread.
*
* @param threadContext - a [ThreadContext] instance
*/
class ElasticThreadContextElement(private val threadContext: ThreadContext) : ThreadContextElement<Unit> {

companion object Key : CoroutineContext.Key<ElasticThreadContextElement>
private var context: StoredContext = threadContext.newStoredContext(true)

override val key: CoroutineContext.Key<*>
get() = Key

override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
this.context = threadContext.stashContext()
}

override fun updateThreadContext(context: CoroutineContext) = this.context.close()
}

class InjectorContextElement(id: String, settings: Settings, threadContext: ThreadContext, private val roles: List<String>?) :
ThreadContextElement<Unit> {
class InjectorContextElement(
id: String,
settings: Settings,
threadContext: ThreadContext,
private val roles: List<String>?,
private val user: User? = null
) : ThreadContextElement<Unit> {

companion object Key : CoroutineContext.Key<InjectorContextElement>
override val key: CoroutineContext.Key<*>
Expand All @@ -202,6 +186,8 @@ class InjectorContextElement(id: String, settings: Settings, threadContext: Thre

override fun updateThreadContext(context: CoroutineContext) {
rolesInjectorHelper.injectRoles(roles)
// This is from where plugins extract backend roles. It should be passed when calling APIs of other plugins
rolesInjectorHelper.injectUserInfo(user)
}

override fun restoreThreadContext(context: CoroutineContext, oldState: Unit) {
Expand Down