Skip to content

Commit

Permalink
Merge pull request #165 from akbhatta/main
Browse files Browse the repository at this point in the history
Taking RBAC settings from Alerting plugin default to false
  • Loading branch information
akbhatta authored Sep 22, 2021
2 parents dc55f62 + 1b08bff commit c3c8e2f
Show file tree
Hide file tree
Showing 15 changed files with 64 additions and 1,398 deletions.
19 changes: 1 addition & 18 deletions reports-scheduler/src/main/config/reports-scheduler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,7 @@
##

# configuration file for the reports scheduler plugin
opendistro.reports:
opensearch.reports:
general:
operationTimeoutMs: 60000 # 60 seconds, Minimum 100ms
defaultItemsQueryCount: 100 # default number of items to query
polling:
jobLockDurationSeconds: 300 # 5 Minutes, Minimum 10 seconds
minPollingDurationSeconds: 300 # 5 Minutes, Minimum 60 seconds
maxPollingDurationSeconds: 900 # 15 Minutes, Minimum 5 Minutes
maxLockRetries: 1 # Max number of retries to retry locking
access:
adminAccess: "AllReports"
# adminAccess values:
## Standard -> Admin user access follows standard user
## AllReports -> Admin user with "all_access" role can see all reports of all users.
filterBy: "NoFilter" # Applied when tenant != __user__
# filterBy values:
## NoFilter -> everyone see each other's reports
## User -> reports are visible to only themselves
## Roles -> reports are visible to users having any one of the role of creator
## BackendRoles -> reports are visible to users having any one of the backend role of creator
ignoreRoles: ["own_index", "kibana_user", "reports_full_access", "reports_read_access", "reports_instances_read_access"]
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import org.opensearch.reportsscheduler.action.GetReportDefinitionAction
import org.opensearch.reportsscheduler.action.GetReportInstanceAction
import org.opensearch.reportsscheduler.action.InContextReportCreateAction
import org.opensearch.reportsscheduler.action.OnDemandReportCreateAction
import org.opensearch.reportsscheduler.action.PollReportInstanceAction
import org.opensearch.reportsscheduler.action.UpdateReportDefinitionAction
import org.opensearch.reportsscheduler.action.UpdateReportInstanceStatusAction
import org.opensearch.reportsscheduler.index.ReportDefinitionsIndex
Expand All @@ -47,12 +46,10 @@ import org.opensearch.reportsscheduler.resthandler.OnDemandReportRestHandler
import org.opensearch.reportsscheduler.resthandler.ReportDefinitionListRestHandler
import org.opensearch.reportsscheduler.resthandler.ReportDefinitionRestHandler
import org.opensearch.reportsscheduler.resthandler.ReportInstanceListRestHandler
import org.opensearch.reportsscheduler.resthandler.ReportInstancePollRestHandler
import org.opensearch.reportsscheduler.resthandler.ReportInstanceRestHandler
import org.opensearch.reportsscheduler.resthandler.ReportStatsRestHandler
import org.opensearch.reportsscheduler.scheduler.ReportDefinitionJobParser
import org.opensearch.reportsscheduler.scheduler.ReportDefinitionJobRunner
import org.opensearch.reportsscheduler.settings.LegacyPluginSettings
import org.opensearch.reportsscheduler.settings.PluginSettings

import org.opensearch.action.ActionRequest
Expand Down Expand Up @@ -99,7 +96,6 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
override fun getSettings(): List<Setting<*>> {
val settingList = arrayListOf<Setting<*>>()
settingList.addAll(PluginSettings.getAllSettings())
settingList.addAll(LegacyPluginSettings.getAllSettings())
return settingList
}

Expand Down Expand Up @@ -171,7 +167,6 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
ReportInstanceRestHandler(),
ReportInstanceListRestHandler(),
OnDemandReportRestHandler(),
ReportInstancePollRestHandler(),
ReportStatsRestHandler()
)
}
Expand All @@ -189,7 +184,6 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
ActionPlugin.ActionHandler(GetReportInstanceAction.ACTION_TYPE, GetReportInstanceAction::class.java),
ActionPlugin.ActionHandler(InContextReportCreateAction.ACTION_TYPE, InContextReportCreateAction::class.java),
ActionPlugin.ActionHandler(OnDemandReportCreateAction.ACTION_TYPE, OnDemandReportCreateAction::class.java),
ActionPlugin.ActionHandler(PollReportInstanceAction.ACTION_TYPE, PollReportInstanceAction::class.java),
ActionPlugin.ActionHandler(UpdateReportDefinitionAction.ACTION_TYPE, UpdateReportDefinitionAction::class.java),
ActionPlugin.ActionHandler(UpdateReportInstanceStatusAction.ACTION_TYPE, UpdateReportInstanceStatusAction::class.java)
)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ internal object ReportDefinitionActions {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("Permission denied for Report Definition ${request.reportDefinitionId}", RestStatus.FORBIDDEN)
}
return GetReportDefinitionResponse(reportDefinitionDetails, UserAccessManager.hasAllInfoAccess(user))
return GetReportDefinitionResponse(reportDefinitionDetails, true)
}

/**
Expand Down Expand Up @@ -170,6 +170,6 @@ internal object ReportDefinitionActions {
UserAccessManager.getSearchAccessInfo(user),
request.fromIndex,
request.maxItems)
return GetAllReportDefinitionsResponse(reportDefinitionsList, UserAccessManager.hasAllInfoAccess(user))
return GetAllReportDefinitionsResponse(reportDefinitionsList, true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,15 @@ import org.opensearch.reportsscheduler.model.InContextReportCreateRequest
import org.opensearch.reportsscheduler.model.InContextReportCreateResponse
import org.opensearch.reportsscheduler.model.OnDemandReportCreateRequest
import org.opensearch.reportsscheduler.model.OnDemandReportCreateResponse
import org.opensearch.reportsscheduler.model.PollReportInstanceResponse
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstance.Status
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusRequest
import org.opensearch.reportsscheduler.model.UpdateReportInstanceStatusResponse
import org.opensearch.reportsscheduler.security.UserAccessManager
import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.OpenSearchStatusException
import org.opensearch.rest.RestStatus
import java.time.Instant
import kotlin.random.Random

/**
* Report instances index operation actions.
Expand Down Expand Up @@ -85,7 +82,7 @@ internal object ReportInstanceActions {
throw OpenSearchStatusException("Report Instance Creation failed", RestStatus.INTERNAL_SERVER_ERROR)
}
val reportInstanceCopy = reportInstance.copy(id = docId)
return InContextReportCreateResponse(reportInstanceCopy, UserAccessManager.hasAllInfoAccess(user))
return InContextReportCreateResponse(reportInstanceCopy, true)
}

/**
Expand Down Expand Up @@ -126,7 +123,7 @@ internal object ReportInstanceActions {
throw OpenSearchStatusException("Report Instance Creation failed", RestStatus.INTERNAL_SERVER_ERROR)
}
val reportInstanceCopy = reportInstance.copy(id = docId)
return OnDemandReportCreateResponse(reportInstanceCopy, UserAccessManager.hasAllInfoAccess(user))
return OnDemandReportCreateResponse(reportInstanceCopy, true)
}

/**
Expand Down Expand Up @@ -181,7 +178,7 @@ internal object ReportInstanceActions {
Metrics.REPORT_PERMISSION_USER_ERROR.counter.increment()
throw OpenSearchStatusException("Permission denied for Report Definition ${request.reportInstanceId}", RestStatus.FORBIDDEN)
}
return GetReportInstanceResponse(reportInstance, UserAccessManager.hasAllInfoAccess(user))
return GetReportInstanceResponse(reportInstance, true)
}

/**
Expand All @@ -196,41 +193,6 @@ internal object ReportInstanceActions {
UserAccessManager.getSearchAccessInfo(user),
request.fromIndex,
request.maxItems)
return GetAllReportInstancesResponse(reportInstanceList, UserAccessManager.hasAllInfoAccess(user))
}

fun poll(user: User?): PollReportInstanceResponse {
log.info("$LOG_PREFIX:ReportInstance-poll")
UserAccessManager.validatePollingUser(user)
val currentTime = Instant.now()
val reportInstances = ReportInstancesIndex.getPendingReportInstances()
return if (reportInstances.isEmpty()) {
PollReportInstanceResponse(getRetryAfterTime())
} else {
// Shuffle list so that when multiple requests are made, chances of lock conflict is less
reportInstances.shuffle()
/*
If the shuffling is perfect random then there is high probability that first item locking is successful
even when there are many parallel requests. i.e. say there are x jobs and y parallel requests.
then x out of y jobs can lock first item and rest cannot lock any jobs. However shuffle may not be perfect
hence checking first few jobs for locking.
*/
val lockedJob = reportInstances.subList(0, PluginSettings.maxLockRetries).find {
val updatedInstance = it.copy(reportInstance = it.reportInstance.copy(
updatedTime = currentTime,
status = Status.Executing
))
ReportInstancesIndex.updateReportInstanceDoc(updatedInstance)
}
if (lockedJob == null) {
PollReportInstanceResponse(PluginSettings.minPollingDurationSeconds)
} else {
PollReportInstanceResponse(0, lockedJob.reportInstance, UserAccessManager.hasAllInfoAccess(user))
}
}
}

private fun getRetryAfterTime(): Int {
return Random.nextInt(PluginSettings.minPollingDurationSeconds, PluginSettings.maxPollingDurationSeconds)
return GetAllReportInstancesResponse(reportInstanceList, true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,6 @@

package org.opensearch.reportsscheduler.index

import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstance.Status
import org.opensearch.reportsscheduler.model.ReportInstanceDoc
import org.opensearch.reportsscheduler.model.ReportInstanceSearchResults
import org.opensearch.reportsscheduler.model.RestTag.ACCESS_LIST_FIELD
import org.opensearch.reportsscheduler.model.RestTag.STATUS_FIELD
import org.opensearch.reportsscheduler.model.RestTag.TENANT_FIELD
import org.opensearch.reportsscheduler.model.RestTag.UPDATED_TIME_FIELD
import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.reportsscheduler.util.SecureIndexClient
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.create.CreateIndexRequest
Expand All @@ -54,8 +42,17 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.QueryBuilders
import org.opensearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import org.opensearch.reportsscheduler.model.ReportInstance
import org.opensearch.reportsscheduler.model.ReportInstanceDoc
import org.opensearch.reportsscheduler.model.ReportInstanceSearchResults
import org.opensearch.reportsscheduler.model.RestTag.ACCESS_LIST_FIELD
import org.opensearch.reportsscheduler.model.RestTag.TENANT_FIELD
import org.opensearch.reportsscheduler.model.RestTag.UPDATED_TIME_FIELD
import org.opensearch.reportsscheduler.settings.PluginSettings
import org.opensearch.reportsscheduler.util.SecureIndexClient
import org.opensearch.reportsscheduler.util.logger
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant
import java.util.concurrent.TimeUnit

/**
Expand Down Expand Up @@ -256,42 +253,4 @@ internal object ReportInstancesIndex {
}
return response.result == DocWriteResponse.Result.DELETED
}

/**
* Get pending report instances
* @return ReportInstanceDoc list
*/
fun getPendingReportInstances(): MutableList<ReportInstanceDoc> {
createIndex()
val query = QueryBuilders.termsQuery(STATUS_FIELD,
Status.Scheduled.name,
Status.Executing.name
)
val sourceBuilder = SearchSourceBuilder()
.timeout(TimeValue(PluginSettings.operationTimeoutMs, TimeUnit.MILLISECONDS))
.size(PluginSettings.defaultItemsQueryCount)
.query(query)
val searchRequest = SearchRequest()
.indices(REPORT_INSTANCES_INDEX_NAME)
.source(sourceBuilder)
val actionFuture = client.search(searchRequest)
val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs)
val hits = response.hits
log.info("$LOG_PREFIX:getPendingReportInstances; totalHits:${hits.totalHits}, retHits:${hits.hits.size}")
val mutableList: MutableList<ReportInstanceDoc> = mutableListOf()
val currentTime = Instant.now()
val refTime = currentTime.minusSeconds(PluginSettings.jobLockDurationSeconds.toLong())
hits.forEach {
val parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE,
it.sourceAsString)
parser.nextToken()
val reportInstance = ReportInstance.parse(parser, it.id)
if (reportInstance.status == Status.Scheduled || // If still in Scheduled state
reportInstance.updatedTime.isBefore(refTime)) { // or when timeout happened
mutableList.add(ReportInstanceDoc(reportInstance, it.seqNo, it.primaryTerm))
}
}
return mutableList
}
}
Loading

0 comments on commit c3c8e2f

Please sign in to comment.