Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Commit

Permalink
Removed IndexManager and making operation directly to Index classes
Browse files Browse the repository at this point in the history
  • Loading branch information
akbhatta committed Nov 3, 2020
1 parent 6c11c7f commit a5b1a86
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ package com.amazon.opendistroforelasticsearch.reportsscheduler
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobSchedulerExtension
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.IndexManager
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportDefinitionsIndex
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportInstancesIndex
import com.amazon.opendistroforelasticsearch.reportsscheduler.job.ReportsSchedulerJobRunnerProxy
import com.amazon.opendistroforelasticsearch.reportsscheduler.job.ScheduledReportJobParser
import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.OnDemandReportRestHandler
Expand Down Expand Up @@ -98,7 +99,8 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
): Collection<Any> {
this.clusterService = clusterService
PluginSettings.addSettingsUpdateConsumer(clusterService)
IndexManager.initialize(client, clusterService)
ReportDefinitionsIndex.initialize(client, clusterService)
ReportInstancesIndex.initialize(client, clusterService)
jobRunner.createRunnerInstance(clusterService, threadPool, client)
return emptyList()
}
Expand All @@ -114,7 +116,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension {
* {@inheritDoc}
*/
override fun getJobIndex(): String {
return JOB_INDEX_NAME // return ReportDefinitionsIndex.REPORT_DEFINITIONS_INDEX_NAME
return JOB_INDEX_NAME // return REPORT_DEFINITIONS_INDEX_NAME
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.amazon.opendistroforelasticsearch.reportsscheduler.action

import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.IndexManager
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportDefinitionsIndex
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.CreateReportDefinitionRequest
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.CreateReportDefinitionResponse
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.DeleteReportDefinitionRequest
Expand Down Expand Up @@ -54,7 +54,7 @@ internal object ReportDefinitionActions {
listOf(TEMP_ROLE_ID), // TODO update with actual requester ID
request.reportDefinition
)
val docId = IndexManager.createReportDefinition(reportDefinitionDetails)
val docId = ReportDefinitionsIndex.createReportDefinition(reportDefinitionDetails)
return if (docId == null) {
CreateReportDefinitionResponse(RestStatus.INTERNAL_SERVER_ERROR,
"Report Definition Creation failed",
Expand All @@ -73,7 +73,7 @@ internal object ReportDefinitionActions {
*/
fun update(request: UpdateReportDefinitionRequest): UpdateReportDefinitionResponse {
log.info("$LOG_PREFIX:ReportDefinition-update ${request.reportDefinitionId}")
val currentReportDefinitionDetails = IndexManager.getReportDefinition(request.reportDefinitionId)
val currentReportDefinitionDetails = ReportDefinitionsIndex.getReportDefinition(request.reportDefinitionId)
return if (currentReportDefinitionDetails == null) { // TODO verify actual requester ID
UpdateReportDefinitionResponse(RestStatus.NOT_FOUND,
"Report Definition ${request.reportDefinitionId} not found",
Expand All @@ -86,7 +86,7 @@ internal object ReportDefinitionActions {
currentReportDefinitionDetails.roles,
request.reportDefinition
)
val isUpdated = IndexManager.updateReportDefinition(request.reportDefinitionId, reportDefinitionDetails)
val isUpdated = ReportDefinitionsIndex.updateReportDefinition(request.reportDefinitionId, reportDefinitionDetails)
if (isUpdated) {
UpdateReportDefinitionResponse(RestStatus.OK,
null,
Expand All @@ -106,7 +106,7 @@ internal object ReportDefinitionActions {
*/
fun info(request: GetReportDefinitionRequest): GetReportDefinitionsResponse {
log.info("$LOG_PREFIX:ReportDefinition-info ${request.reportDefinitionId}")
val reportDefinitionDetails = IndexManager.getReportDefinition(request.reportDefinitionId)
val reportDefinitionDetails = ReportDefinitionsIndex.getReportDefinition(request.reportDefinitionId)
return if (reportDefinitionDetails == null) { // TODO verify actual requester ID
GetReportDefinitionsResponse(RestStatus.NOT_FOUND,
"Report Definition ${request.reportDefinitionId} not found",
Expand All @@ -125,13 +125,13 @@ internal object ReportDefinitionActions {
*/
fun delete(request: DeleteReportDefinitionRequest): DeleteReportDefinitionResponse {
log.info("$LOG_PREFIX:ReportDefinition-delete ${request.reportDefinitionId}")
val reportDefinitionDetails = IndexManager.getReportDefinition(request.reportDefinitionId)
val reportDefinitionDetails = ReportDefinitionsIndex.getReportDefinition(request.reportDefinitionId)
return if (reportDefinitionDetails == null) { // TODO verify actual requester ID
DeleteReportDefinitionResponse(RestStatus.NOT_FOUND,
"Report Definition ${request.reportDefinitionId} not found",
null)
} else {
val isDeleted = IndexManager.deleteReportDefinition(request.reportDefinitionId)
val isDeleted = ReportDefinitionsIndex.deleteReportDefinition(request.reportDefinitionId)
if (!isDeleted) {
DeleteReportDefinitionResponse(RestStatus.REQUEST_TIMEOUT,
"Report Definition ${request.reportDefinitionId} delete failed",
Expand All @@ -152,7 +152,7 @@ internal object ReportDefinitionActions {
fun getAll(request: GetAllReportDefinitionsRequest): GetAllReportDefinitionsResponse {
log.info("$LOG_PREFIX:ReportDefinition-getAll ${request.fromIndex}")
// TODO verify actual requester ID
val reportDefinitionsList = IndexManager.getAllReportDefinitions(listOf(TEMP_ROLE_ID), request.fromIndex)
val reportDefinitionsList = ReportDefinitionsIndex.getAllReportDefinitions(listOf(TEMP_ROLE_ID), request.fromIndex)
return if (reportDefinitionsList.isEmpty()) {
GetAllReportDefinitionsResponse(RestStatus.NOT_FOUND,
"No Report Definitions found",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package com.amazon.opendistroforelasticsearch.reportsscheduler.action

import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.IndexManager
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportDefinitionsIndex
import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportInstancesIndex
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.GetAllReportInstancesRequest
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.GetAllReportInstancesResponse
import com.amazon.opendistroforelasticsearch.reportsscheduler.model.GetReportInstanceRequest
Expand Down Expand Up @@ -62,7 +63,7 @@ internal object ReportInstanceActions {
request.status,
request.statusText,
request.inContextDownloadUrlPath)
val docId = IndexManager.createReportInstance(reportInstance)
val docId = ReportInstancesIndex.createReportInstance(reportInstance)
return if (docId == null) {
InContextReportCreateResponse(RestStatus.INTERNAL_SERVER_ERROR,
"Report Instance Creation failed",
Expand All @@ -83,7 +84,7 @@ internal object ReportInstanceActions {
fun createOnDemandFromDefinition(request: OnDemandReportCreateRequest): OnDemandReportCreateResponse {
log.info("$LOG_PREFIX:ReportInstance-createOnDemandFromDefinition ${request.reportDefinitionId}")
val currentTime = Instant.now()
val reportDefinitionDetails = IndexManager.getReportDefinition(request.reportDefinitionId)
val reportDefinitionDetails = ReportDefinitionsIndex.getReportDefinition(request.reportDefinitionId)
return if (reportDefinitionDetails == null) { // TODO verify actual requester ID
OnDemandReportCreateResponse(RestStatus.INTERNAL_SERVER_ERROR,
"Report Definition ${request.reportDefinitionId} not found",
Expand All @@ -100,7 +101,7 @@ internal object ReportInstanceActions {
reportDefinitionDetails.roles,
reportDefinitionDetails,
currentStatus)
val docId = IndexManager.createReportInstance(reportInstance)
val docId = ReportInstancesIndex.createReportInstance(reportInstance)
if (docId == null) {
OnDemandReportCreateResponse(RestStatus.INTERNAL_SERVER_ERROR,
"Report Instance Creation failed",
Expand All @@ -121,7 +122,7 @@ internal object ReportInstanceActions {
*/
fun update(request: UpdateReportInstanceStatusRequest): UpdateReportInstanceStatusResponse {
log.info("$LOG_PREFIX:ReportInstance-update ${request.reportInstanceId}")
val currentReportInstance = IndexManager.getReportInstance(request.reportInstanceId)
val currentReportInstance = ReportInstancesIndex.getReportInstance(request.reportInstanceId)
return if (currentReportInstance == null) { // TODO verify actual requester ID
UpdateReportInstanceStatusResponse(RestStatus.NOT_FOUND,
"Report Instance not found",
Expand All @@ -135,7 +136,7 @@ internal object ReportInstanceActions {
val updatedReportInstance = currentReportInstance.copy(updatedTime = currentTime,
status = request.status,
statusText = request.statusText)
val isUpdated = IndexManager.updateReportInstance(updatedReportInstance)
val isUpdated = ReportInstancesIndex.updateReportInstance(updatedReportInstance)
if (isUpdated) {
UpdateReportInstanceStatusResponse(RestStatus.OK,
null,
Expand All @@ -155,7 +156,7 @@ internal object ReportInstanceActions {
*/
fun info(request: GetReportInstanceRequest): GetReportInstanceResponse {
log.info("$LOG_PREFIX:ReportInstance-info ${request.reportInstanceId}")
val reportInstance = IndexManager.getReportInstance(request.reportInstanceId)
val reportInstance = ReportInstancesIndex.getReportInstance(request.reportInstanceId)
return if (reportInstance == null) { // TODO verify actual requester ID
GetReportInstanceResponse(RestStatus.NOT_FOUND,
"Report Instance ${request.reportInstanceId} not found",
Expand All @@ -175,7 +176,7 @@ internal object ReportInstanceActions {
fun getAll(request: GetAllReportInstancesRequest): GetAllReportInstancesResponse {
log.info("$LOG_PREFIX:ReportInstance-getAll ${request.fromIndex}")
// TODO verify actual requester ID
val reportInstanceList = IndexManager.getAllReportInstances(listOf(TEMP_ROLE_ID), request.fromIndex)
val reportInstanceList = ReportInstancesIndex.getAllReportInstances(listOf(TEMP_ROLE_ID), request.fromIndex)
return if (reportInstanceList.isEmpty()) {
GetAllReportInstancesResponse(RestStatus.NOT_FOUND,
"No Report Instances found",
Expand All @@ -189,7 +190,7 @@ internal object ReportInstanceActions {
log.info("$LOG_PREFIX:ReportInstance-poll")
val currentTime = Instant.now()
// TODO verify actual requester ID to be kibana background task
val reportInstances = IndexManager.getPendingReportInstances()
val reportInstances = ReportInstancesIndex.getPendingReportInstances()
return if (reportInstances.isEmpty()) {
PollReportInstanceResponse(RestStatus.MULTI_STATUS,
"No Scheduled Report Instance found",
Expand All @@ -209,7 +210,7 @@ internal object ReportInstanceActions {
updatedTime = currentTime,
status = Status.Executing
))
IndexManager.updateReportInstanceDoc(updatedInstance)
ReportInstancesIndex.updateReportInstanceDoc(updatedInstance)
}
if (lockedJob == null) {
PollReportInstanceResponse(RestStatus.MULTI_STATUS,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit a5b1a86

Please sign in to comment.