Skip to content

Commit

Permalink
Add user context for background jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Li <[email protected]>
  • Loading branch information
joshuali925 committed Sep 28, 2021
1 parent 5871859 commit 1c30567
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ package org.opensearch.reportsscheduler.notifications

import org.opensearch.action.ActionListener
import org.opensearch.client.node.NodeClient
import org.opensearch.common.util.concurrent.ThreadContext
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.notifications.NotificationConstants.FEATURE_REPORTS
import org.opensearch.commons.notifications.NotificationsPluginInterface
import org.opensearch.commons.notifications.action.SendNotificationResponse
Expand Down Expand Up @@ -43,17 +45,50 @@ internal object NotificationsActions {
/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @param referenceId [String] object
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String) {
fun send(delivery: ReportDefinition.Delivery, referenceId: String): SendNotificationResponse? {
return send(delivery, referenceId, "")
}

/**
* Send notifications based on delivery parameter
* @param delivery [ReportDefinition.Delivery] object
* @param referenceId [String] object
* @param userStr [String] object,
* @return [CreateReportDefinitionResponse]
*/
fun send(delivery: ReportDefinition.Delivery, referenceId: String, userStr: String?): SendNotificationResponse? {
if (userStr.isNullOrEmpty()) {
return sendNotificationHelper(delivery, referenceId)
}

var sendNotificationResponse: SendNotificationResponse? = null
client.threadPool().threadContext.stashContext().use {
client.threadPool().threadContext.putTransient(
ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT,
userStr
)
sendNotificationResponse = sendNotificationHelper(delivery, referenceId)
}
return sendNotificationResponse
}

private fun sendNotificationHelper(
delivery: ReportDefinition.Delivery,
referenceId: String
): SendNotificationResponse? {
log.info("$LOG_PREFIX:NotificationsActions-send")
var sendNotificationResponse: SendNotificationResponse? = null
NotificationsPluginInterface.sendNotification(
client,
EventSource(delivery.title, referenceId, FEATURE_REPORTS, SeverityType.INFO),
ChannelMessage(delivery.textDescription, delivery.htmlDescription, null),
delivery.configIds,
object : ActionListener<SendNotificationResponse> {
override fun onResponse(sendNotificationResponse: SendNotificationResponse) {
override fun onResponse(response: SendNotificationResponse) {
sendNotificationResponse = response
log.info("$LOG_PREFIX:NotificationsActions-send:$sendNotificationResponse")
}

Expand All @@ -62,5 +97,45 @@ internal object NotificationsActions {
}
}
)
return sendNotificationResponse
}

/**
* Executes the given [block] function on this resource and then closes it down correctly whether an exception
* is thrown or not.
*
* In case if the resource is being closed due to an exception occurred in [block], and the closing also fails with an exception,
* the latter is added to the [suppressed][java.lang.Throwable.addSuppressed] exceptions of the former.
*
* @param block a function to process this [AutoCloseable] resource.
* @return the result of [block] function invoked on this resource.
*/
@Suppress("TooGenericExceptionCaught")
private inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
var exception: Throwable? = null
try {
return block(this)
} catch (e: Throwable) {
exception = e
throw e
} finally {
closeFinally(exception)
}
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
*
* The suppressed exception is added to the list of suppressed exceptions of [cause] exception.
*/
@Suppress("TooGenericExceptionCaught")
private fun ThreadContext.StoredContext.closeFinally(cause: Throwable?) = when (cause) {
null -> close()
else -> try {
close()
} catch (closeException: Throwable) {
cause.addSuppressed(closeException)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

package org.opensearch.reportsscheduler.scheduler

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.opensearch.jobscheduler.spi.JobExecutionContext
import org.opensearch.jobscheduler.spi.ScheduledJobParameter
import org.opensearch.jobscheduler.spi.ScheduledJobRunner
Expand All @@ -35,10 +38,8 @@ import org.opensearch.reportsscheduler.index.ReportInstancesIndex
import org.opensearch.reportsscheduler.model.ReportDefinitionDetails
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.notifications.NotificationsActions
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.util.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import java.time.Instant

internal object ReportDefinitionJobRunner : ScheduledJobRunner {
Expand Down Expand Up @@ -69,8 +70,11 @@ internal object ReportDefinitionJobRunner : ScheduledJobRunner {
log.warn("$LOG_PREFIX:runJob-job creation failed for $reportInstance")
} else {
log.info("$LOG_PREFIX:runJob-created job:$id")
if (reportDefinitionDetails.reportDefinition.delivery != null)
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, id)
if (reportDefinitionDetails.reportDefinition.delivery != null) {
val user = UserAccessManager.getUserFromAccess(job.access)
val userStr = user?.let { it.toString() } ?: ""
NotificationsActions.send(reportDefinitionDetails.reportDefinition.delivery, id, userStr)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ internal object UserAccessManager {
fun validateUser(user: User?) {
if (isUserPrivateTenant(user) && user?.name == null) {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("User name not provided for private tenant access",
RestStatus.FORBIDDEN)
throw OpenSearchStatusException(
"User name not provided for private tenant access",
RestStatus.FORBIDDEN
)
}
if (PluginSettings.isRbacEnabled()) {
// backend roles must be present
if (user?.backendRoles.isNullOrEmpty()) {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("User doesn't have backend roles configured. Contact administrator.",
RestStatus.FORBIDDEN)
throw OpenSearchStatusException(
"User doesn't have backend roles configured. Contact administrator.",
RestStatus.FORBIDDEN
)
}
}
}
Expand Down Expand Up @@ -96,6 +100,19 @@ internal object UserAccessManager {
return retList
}

/**
* Get user object from all user access info.
*/
fun getUserFromAccess(access: List<String>): User? {
if (access.isNullOrEmpty()) {
return null
}
val name = access.find { it.startsWith(USER_TAG) }?.substring(USER_TAG.length)
val backendRoles = access.filter { it.startsWith(ROLE_TAG) }.map { it.substring(ROLE_TAG.length) }
val roles = access.filter { it.startsWith(BACKEND_ROLE_TAG) }.map { it.substring(BACKEND_ROLE_TAG.length) }
return User(name, backendRoles, roles, listOf())
}

/**
* Get access info for search filtering
*/
Expand Down

0 comments on commit 1c30567

Please sign in to comment.