From 36343e68d988f4674df25a1300966f6b6856d40a Mon Sep 17 00:00:00 2001 From: Anantha Krishna Bhatta Date: Tue, 10 Nov 2020 10:46:23 -0800 Subject: [PATCH] Removed deprecated APIs and cleaned up code. --- .../action/AbstractActionHandler.java | 66 ------ .../action/ReportsJobActionHandler.java | 203 ------------------ .../action/ReportsScheduleActionHandler.java | 127 ----------- .../job/ReportsSchedulerJobRunner.java | 103 --------- .../job/ReportsSchedulerJobRunnerProxy.java | 90 -------- .../job/ScheduledReportJobParser.java | 93 -------- .../job/parameter/JobConstant.java | 25 --- .../job/parameter/JobParameter.java | 110 ---------- .../ReportsSchedulerPlugin.kt | 16 +- .../resthandler/ReportsJobRestHandler.kt | 81 ------- .../resthandler/ReportsScheduleRestHandler.kt | 79 ------- 11 files changed, 1 insertion(+), 992 deletions(-) delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/AbstractActionHandler.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsJobActionHandler.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsScheduleActionHandler.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunner.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunnerProxy.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ScheduledReportJobParser.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java delete mode 100644 reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobParameter.java delete mode 100644 reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsJobRestHandler.kt delete mode 100644 reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsScheduleRestHandler.kt diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/AbstractActionHandler.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/AbstractActionHandler.java deleted file mode 100644 index 4c60cbbd..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/AbstractActionHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.action; - -import java.io.IOException; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; - -/** Action handler to process REST request and handle failures. */ -public abstract class AbstractActionHandler { - private final NodeClient client; - private final RestChannel channel; - private final Logger logger = LogManager.getLogger(AbstractActionHandler.class); - - protected NodeClient getClient() { - return client; - } - - protected RestChannel getChannel() { - return channel; - } - - /** - * Constructor function. - * - * @param client ES node client that executes actions on the local node - * @param channel ES channel used to construct bytes / builder based outputs, and send responses - */ - public AbstractActionHandler(NodeClient client, RestChannel channel) { - this.client = client; - this.channel = channel; - } - - /** - * Send failure message via channel. - * - * @param e exception - */ - public void onFailure(Exception e) { - if (e != null) { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (IOException e1) { - logger.warn("Fail to send out failure message of exception", e); - } - } - } -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsJobActionHandler.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsJobActionHandler.java deleted file mode 100644 index abf20d4c..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsJobActionHandler.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.action; - -import java.io.IOException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Queue; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.builder.SearchSourceBuilder; - -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService; -import com.amazon.opendistroforelasticsearch.reportsscheduler.job.parameter.JobParameter; - -import static com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.JOB_QUEUE_INDEX_NAME; -import static com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.LOCK_DURATION_SECONDS; - -public class ReportsJobActionHandler extends AbstractActionHandler { - private static final Logger log = LogManager.getLogger(ReportsJobActionHandler.class); - private final LockService lockService; - private final NodeClient client; - private final RestChannel channel; - - /** - * Constructor function. - * - * @param client ES node client that executes actions on the local node - * @param channel ES channel used to construct bytes / builder based outputs, and send responses - * @param clusterService ES cluster service - */ - public ReportsJobActionHandler( - NodeClient client, RestChannel channel, ClusterService clusterService) { - super(client, channel); - this.lockService = new LockService(client, clusterService); - this.client = getClient(); - this.channel = getChannel(); - } - - public void getJob() { - // get all jobs from job_queue index - final SearchRequest searchRequest = new SearchRequest(JOB_QUEUE_INDEX_NAME); - final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(QueryBuilders.matchAllQuery()); - - // // sort the document by enqueue_time - // searchSourceBuilder.sort(new FieldSortBuilder("enqueue_time").order(SortOrder.DESC)); - searchRequest.source(searchSourceBuilder); - client.search( - searchRequest, - ActionListener.wrap( - response -> onSearchJobResponse(response), exception -> onFailure(exception))); - } - - private void onSearchJobResponse(SearchResponse response) { - final SearchHits hits = response.getHits(); - final SearchHit[] searchHits = hits.getHits(); - final List searchHitsList = Arrays.asList(searchHits); - // randomize the jobs, for possible faster job retrieval - Randomness.shuffle(searchHitsList); - final Queue searchHitsQueue = new LinkedList<>(searchHitsList); - - findFirstAvailableJob(searchHitsQueue); - } - - /** - * Attempt to find single jobs that is not being locked, and send the job information back to - * reporting core for job execution. - * - * @param searchHitsQueue a queue which saves all jobs from .reports_scheduler_job_queue index - */ - private void findFirstAvailableJob(Queue searchHitsQueue) { - final SearchHit hit = searchHitsQueue.poll(); - - if (hit != null) { - String jobId = hit.getId(); - // set up "fake" jobParamater and jobContext that is required to initialize lockService - final JobParameter jobParameter = - new JobParameter(null, null, null, false, null, null, LOCK_DURATION_SECONDS); - final JobExecutionContext jobExecutionContext = - new JobExecutionContext(null, null, null, JOB_QUEUE_INDEX_NAME, jobId); - - lockService.acquireLock( - jobParameter, - jobExecutionContext, - ActionListener.wrap( - lock -> { - if (lock == null) { - findFirstAvailableJob(searchHitsQueue); - } else { - log.info("send job data(report_definition_id) to reporting core for execution"); - - final RestResponse restResponse = - new BytesRestResponse( - RestStatus.OK, hit.toXContent(JsonXContent.contentBuilder(), null)); - channel.sendResponse(restResponse); - } - }, - exception -> { - channel.sendResponse( - new BytesRestResponse( - RestStatus.INTERNAL_SERVER_ERROR, "Failed to acquire lock")); - log.debug("Failed tp acquire lock " + exception); - })); - } else { - log.info("Kibana is polling, but currently no job to execute"); - channel.sendResponse(new BytesRestResponse(RestStatus.NO_CONTENT, "No jobs to execute")); - } - } - - /** - * Once the report core(Kibana side) is done executing job, this function will be called to - * delete/release the lock, remove the job from job queue(ES index) - * - * @param jobId the id of the scheduled job - */ - public void updateJob(String jobId) { - // the lockId format is required by lockService to avoid conflict - final String lockId = JOB_QUEUE_INDEX_NAME + "-" + jobId; - - // remove job from queue - final DeleteRequest deleteRequest = new DeleteRequest().index(JOB_QUEUE_INDEX_NAME).id(jobId); - - client.delete( - deleteRequest, - new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - channel.sendResponse( - new BytesRestResponse( - RestStatus.NOT_FOUND, - String.format(Locale.ROOT, "Job id %s doesn't exist", jobId))); - } else if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) { - // delete lock of that job - lockService.deleteLock( - lockId, - ActionListener.wrap( - deleted -> { - log.debug("Deleted lock: {}", deleted); - channel.sendResponse( - new BytesRestResponse( - RestStatus.OK, - String.format( - Locale.ROOT, - "Job removed from jobs queue. Job id: %s", - jobId))); - }, - exception -> log.debug("Failed to delete lock", exception))); - } - } - - @Override - public void onFailure(Exception e) { - final RestStatus statusCode; - if (e instanceof IOException) { - statusCode = RestStatus.BAD_GATEWAY; - } else if (e instanceof ElasticsearchException) { - statusCode = RestStatus.SERVICE_UNAVAILABLE; - } else { - statusCode = RestStatus.INTERNAL_SERVER_ERROR; - } - channel.sendResponse(new BytesRestResponse(statusCode, e.getMessage())); - } - }); - } -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsScheduleActionHandler.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsScheduleActionHandler.java deleted file mode 100644 index 1694da60..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/action/ReportsScheduleActionHandler.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.action; - -import java.io.IOException; -import java.util.Locale; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.common.xcontent.json.JsonXContent; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; - -import static com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.JOB_INDEX_NAME; - -public class ReportsScheduleActionHandler extends AbstractActionHandler { - private static final Logger log = LogManager.getLogger(ReportsScheduleActionHandler.class); - private final NodeClient client; - private final RestChannel channel; - - public ReportsScheduleActionHandler(NodeClient client, RestChannel channel) { - super(client, channel); - this.client = getClient(); - this.channel = getChannel(); - } - - public void createSchedule(String jobId, RestRequest request) { - final IndexRequest indexRequest = - new IndexRequest() - .index(JOB_INDEX_NAME) - .id(jobId) - .source(request.requiredContent(), XContentType.JSON); - - // index the job parameter - client.index( - indexRequest, - new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - try { - final RestResponse restResponse = - new BytesRestResponse( - RestStatus.OK, indexResponse.toXContent(JsonXContent.contentBuilder(), null)); - channel.sendResponse(restResponse); - } catch (IOException e) { - channel.sendResponse( - new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); - } - } - - @Override - public void onFailure(Exception e) { - final RestStatus statusCode; - if (e instanceof IOException) { - statusCode = RestStatus.BAD_GATEWAY; - } else if (e instanceof ElasticsearchException) { - statusCode = RestStatus.SERVICE_UNAVAILABLE; - } else { - statusCode = RestStatus.INTERNAL_SERVER_ERROR; - } - channel.sendResponse(new BytesRestResponse(statusCode, e.getMessage())); - } - }); - } - - public void deleteSchedule(String jobId) { - final DeleteRequest deleteRequest = new DeleteRequest().index(JOB_INDEX_NAME).id(jobId); - - client.delete( - deleteRequest, - new ActionListener() { - @Override - public void onResponse(DeleteResponse deleteResponse) { - if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) { - channel.sendResponse( - new BytesRestResponse( - RestStatus.NOT_FOUND, - String.format(Locale.ROOT, "Job id %s doesn't exist", jobId))); - } else if (deleteResponse.getResult() == DocWriteResponse.Result.DELETED) { - channel.sendResponse( - new BytesRestResponse( - RestStatus.OK, - String.format(Locale.ROOT, "Job deleted. Job id: %s", jobId))); - } - } - - @Override - public void onFailure(Exception e) { - final RestStatus statusCode; - if (e instanceof IOException) { - statusCode = RestStatus.BAD_GATEWAY; - } else if (e instanceof ElasticsearchException) { - statusCode = RestStatus.SERVICE_UNAVAILABLE; - } else { - statusCode = RestStatus.INTERNAL_SERVER_ERROR; - } - channel.sendResponse(new BytesRestResponse(statusCode, e.getMessage())); - } - }); - } -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunner.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunner.java deleted file mode 100644 index b5afc62e..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunner.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.job; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.threadpool.ThreadPool; - -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner; -import com.amazon.opendistroforelasticsearch.reportsscheduler.job.parameter.JobParameter; - -import static com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.JOB_QUEUE_INDEX_NAME; - -/** - * This reports scheduler job runner will add a scheduled job to a queue index once it gets - * triggered. - */ -public class ReportsSchedulerJobRunner implements ScheduledJobRunner { - private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); - - private final ClusterService clusterService; - private final ThreadPool threadPool; - private final Client client; - - public ReportsSchedulerJobRunner( - ClusterService clusterService, ThreadPool threadPool, Client client) { - if (clusterService == null) { - throw new IllegalArgumentException("ClusterService is not initialized"); - } - - if (threadPool == null) { - throw new IllegalArgumentException("ThreadPool is not initialized"); - } - - this.clusterService = clusterService; - this.threadPool = threadPool; - this.client = client; - } - - @Override - public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { - if (!(jobParameter instanceof JobParameter)) { - throw new IllegalStateException( - "Job parameter is not instance of JobParameter, type: " - + jobParameter.getClass().getCanonicalName()); - } - - Runnable runnable = - () -> { - final JobParameter parameter = (JobParameter) jobParameter; - final String reportDefinitionId = parameter.getReportDefinitionId(); - - // compose json and save into job queue index - final Map jsonMap = new HashMap<>(); - jsonMap.put("report_definition_id", reportDefinitionId); - jsonMap.put("triggered_time", context.getExpectedExecutionTime().toEpochMilli()); - - final IndexRequest indexRequest = - new IndexRequest().index(JOB_QUEUE_INDEX_NAME).id(reportDefinitionId).source(jsonMap); - - client.index( - indexRequest, - new ActionListener() { - @Override - public void onResponse(IndexResponse indexResponse) { - log.info( - "Scheduled job triggered and add to job queue index, waiting to be picked up by reporting core poller"); - } - - @Override - public void onFailure(Exception e) { - log.error( - "Scheduled job gets triggered but fail to add to job queue index " - + e.toString()); - } - }); - }; - threadPool.generic().submit(runnable); - } -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunnerProxy.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunnerProxy.java deleted file mode 100644 index 21a0e037..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ReportsSchedulerJobRunnerProxy.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.job; - -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobExecutionContext; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobRunner; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.client.Client; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.threadpool.ThreadPool; - -/** - * Reports scheduler job runner proxy class. - * - *

The job runner should be a singleton class if it uses Elasticsearch client or other objects - * passed from Elasticsearch. Because when registering the job runner to JobScheduler plugin, - * Elasticsearch has not invoke plugins' createComponents() method. That is saying the plugin is not - * completely initialized, and the Elasticsearch {@link Client}, {@link ClusterService} and other - * objects are not available to plugin and this job runner. - * - *

So we have to move this job runner initialization to {@link Plugin} createComponents() method, - * and using singleton job runner to ensure we register a usable job runner instance to JobScheduler - * plugin. - * - */ -public class ReportsSchedulerJobRunnerProxy implements ScheduledJobRunner { - private static final Logger log = LogManager.getLogger(ScheduledJobRunner.class); - private static ReportsSchedulerJobRunnerProxy INSTANCE; - private ReportsSchedulerJobRunner implementation; - - public static ReportsSchedulerJobRunnerProxy getJobRunnerInstance() { - if (INSTANCE != null) { - return INSTANCE; - } - synchronized (ReportsSchedulerJobRunnerProxy.class) { - if (INSTANCE != null) { - return INSTANCE; - } - INSTANCE = new ReportsSchedulerJobRunnerProxy(); - return INSTANCE; - } - } - - public void createRunnerInstance( - ClusterService clusterClient, ThreadPool threadPool, Client client) { - if (implementation != null) { - return; - } - synchronized (ReportsSchedulerJobRunnerProxy.class) { - if (implementation != null) { - return; - } - implementation = new ReportsSchedulerJobRunner(clusterClient, threadPool, client); - } - } - - private ReportsSchedulerJobRunnerProxy() { - // Singleton class, use getJobRunner method instead of constructor - } - - @Override - public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { - final ReportsSchedulerJobRunner local; - synchronized (ReportsSchedulerJobRunnerProxy.class) { - local = implementation; - } - - if (local != null) { - local.runJob(jobParameter, context); - } else { - log.error("Job runner is called before creating instance"); - } - } -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ScheduledReportJobParser.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ScheduledReportJobParser.java deleted file mode 100644 index b8a31af0..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/ScheduledReportJobParser.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.job; - -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.JobDocVersion; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParser; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.Schedule; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.ScheduleParser; -import com.amazon.opendistroforelasticsearch.reportsscheduler.job.parameter.JobConstant; -import com.amazon.opendistroforelasticsearch.reportsscheduler.job.parameter.JobParameter; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentParserUtils; - -import java.io.IOException; -import java.time.Instant; - -public class ScheduledReportJobParser implements ScheduledJobParser { - - @Override - public ScheduledJobParameter parse(XContentParser parser, String id, JobDocVersion jobDocVersion) throws IOException { - String jobName = null; - Instant enabledTime = null; - String reportDefinitionId = null; - boolean isEnabled = false; - Schedule schedule = null; - Instant lastUpdateTime = null; - Long lockDurationSeconds = null; - - XContentParserUtils.ensureExpectedToken( - XContentParser.Token.START_OBJECT, parser.nextToken(), parser::getTokenLocation); - - while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) { - String fieldName = parser.currentName(); - parser.nextToken(); - switch (fieldName) { - case JobConstant.NAME_FIELD: - jobName = parser.text(); - break; - case JobConstant.ENABLED_FILED: - isEnabled = parser.booleanValue(); - break; - case JobConstant.ENABLED_TIME_FILED: - enabledTime = parseInstantValue(parser); - break; - case JobConstant.LAST_UPDATE_TIME_FIELD: - lastUpdateTime = parseInstantValue(parser); - break; - case JobConstant.SCHEDULE_FIELD: - schedule = ScheduleParser.parse(parser); - break; - case JobConstant.REPORT_DEFINITION_ID: - reportDefinitionId = parser.text(); - break; - default: - XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); - } - } - return new JobParameter( - jobName, - enabledTime, - reportDefinitionId, - isEnabled, - schedule, - lastUpdateTime, - lockDurationSeconds); - } - - private Instant parseInstantValue(XContentParser parser) throws IOException { - if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) { - return null; - } - if (parser.currentToken().isValue()) { - return Instant.ofEpochMilli(parser.longValue()); - } - XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation()); - return null; - } -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java deleted file mode 100644 index 26831b8d..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.job.parameter; - -public class JobConstant { - public static final String NAME_FIELD = "name"; - public static final String ENABLED_FILED = "enabled"; - public static final String LAST_UPDATE_TIME_FIELD = "last_update_time"; - public static final String SCHEDULE_FIELD = "schedule"; - public static final String ENABLED_TIME_FILED = "enabled_time"; - public static final String REPORT_DEFINITION_ID = "report_definition_id"; -} diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobParameter.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobParameter.java deleted file mode 100644 index 60977491..00000000 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobParameter.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.reportsscheduler.job.parameter; - -import java.io.IOException; -import java.time.Instant; - -import org.elasticsearch.common.xcontent.XContentBuilder; - -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.ScheduledJobParameter; -import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.Schedule; - -/** - * A report scheduled job parameter. - * - *

It adds an additional "reportDefinition" field to {@link ScheduledJobParameter}, which stores - * the index the job runner will watch. - */ -public class JobParameter implements ScheduledJobParameter { - - private final String jobName; - private final Instant enabledTime; - private final String reportDefinitionId; - private final boolean isEnabled; - private final Schedule schedule; - private final Instant lastUpdateTime; - private final Long lockDurationSeconds; - - public JobParameter( - String jobName, - Instant enabledTime, - String reportDefinitionId, - boolean isEnabled, - Schedule schedule, - Instant lastUpdateTime, - Long lockDurationSeconds) { - this.jobName = jobName; - this.enabledTime = enabledTime; - this.reportDefinitionId = reportDefinitionId; - this.isEnabled = isEnabled; - this.schedule = schedule; - this.lastUpdateTime = lastUpdateTime; - this.lockDurationSeconds = lockDurationSeconds; - } - - @Override - public String getName() { - return this.jobName; - } - - @Override - public Instant getLastUpdateTime() { - return this.lastUpdateTime; - } - - @Override - public Instant getEnabledTime() { - return this.enabledTime; - } - - @Override - public Schedule getSchedule() { - return this.schedule; - } - - @Override - public boolean isEnabled() { - return this.isEnabled; - } - - @Override - public Long getLockDurationSeconds() { - return this.lockDurationSeconds; - } - - public String getReportDefinitionId() { - return this.reportDefinitionId; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder - .field(JobConstant.NAME_FIELD, this.jobName) - .field(JobConstant.ENABLED_FILED, this.isEnabled) - .field(JobConstant.SCHEDULE_FIELD, this.schedule) - .field(JobConstant.REPORT_DEFINITION_ID, this.reportDefinitionId); - if (this.enabledTime != null) { - builder.timeField( - JobConstant.ENABLED_TIME_FILED, - JobConstant.ENABLED_TIME_FILED, - this.enabledTime.toEpochMilli()); - } - builder.endObject(); - return builder; - } -} diff --git a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt index efc15c95..996c62e1 100644 --- a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -32,15 +32,12 @@ import com.amazon.opendistroforelasticsearch.reportsscheduler.action.UpdateRepor import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportDefinitionsIndex import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportDefinitionsIndex.REPORT_DEFINITIONS_INDEX_NAME import com.amazon.opendistroforelasticsearch.reportsscheduler.index.ReportInstancesIndex -import com.amazon.opendistroforelasticsearch.reportsscheduler.job.ReportsSchedulerJobRunnerProxy import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.OnDemandReportRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportDefinitionListRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportDefinitionRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportInstanceListRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportInstancePollRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportInstanceRestHandler -import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportsJobRestHandler -import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportsScheduleRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.scheduler.ReportDefinitionJobParser import com.amazon.opendistroforelasticsearch.reportsscheduler.scheduler.ReportDefinitionJobRunner import com.amazon.opendistroforelasticsearch.reportsscheduler.settings.PluginSettings @@ -79,16 +76,9 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension { companion object { const val PLUGIN_NAME = "opendistro-reports-scheduler" const val LOG_PREFIX = "reports" - const val BASE_SCHEDULER_URI = "/_opendistro/reports_scheduler" const val BASE_REPORTS_URI = "/_opendistro/_reports" - const val JOB_INDEX_NAME = ".reports_scheduler" - const val JOB_QUEUE_INDEX_NAME = ".reports_scheduler_job_queue" - const val LOCK_DURATION_SECONDS = 300L } - private val jobRunner = ReportsSchedulerJobRunnerProxy.getJobRunnerInstance() - private lateinit var clusterService: ClusterService // initialized in createComponents() - /** * {@inheritDoc} */ @@ -112,11 +102,9 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension { indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier ): Collection { - this.clusterService = clusterService PluginSettings.addSettingsUpdateConsumer(clusterService) ReportDefinitionsIndex.initialize(client, clusterService) ReportInstancesIndex.initialize(client, clusterService) - jobRunner.createRunnerInstance(clusterService, threadPool, client) return emptyList() } @@ -166,9 +154,7 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension { ReportInstanceRestHandler(), ReportInstanceListRestHandler(), OnDemandReportRestHandler(), - ReportsScheduleRestHandler(), - ReportInstancePollRestHandler(), - ReportsJobRestHandler(clusterService) + ReportInstancePollRestHandler() ) } diff --git a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsJobRestHandler.kt b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsJobRestHandler.kt deleted file mode 100644 index 21aa7d0e..00000000 --- a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsJobRestHandler.kt +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ -package com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler - -import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.BASE_SCHEDULER_URI -import com.amazon.opendistroforelasticsearch.reportsscheduler.action.ReportsJobActionHandler -import com.google.common.collect.ImmutableList -import org.elasticsearch.client.node.NodeClient -import org.elasticsearch.cluster.service.ClusterService -import org.elasticsearch.rest.BaseRestHandler -import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer -import org.elasticsearch.rest.BytesRestResponse -import org.elasticsearch.rest.RestChannel -import org.elasticsearch.rest.RestHandler -import org.elasticsearch.rest.RestRequest -import org.elasticsearch.rest.RestStatus - -/** - * Rest handler for polling/managing jobs. - * This handler [ReportsJobActionHandler] for job handling. - */ -internal class ReportsJobRestHandler(private val clusterService: ClusterService) : BaseRestHandler() { - companion object { - private const val SCHEDULER_JOB_ACTION = "reports_scheduler_job_action" - private const val JOB = "job" - private const val JOB_ID = "job_id" - private const val JOB_ID_URL = "$BASE_SCHEDULER_URI/$JOB/{$JOB_ID}" - private const val JOB_URL = "$BASE_SCHEDULER_URI/$JOB" - } - - /** - * {@inheritDoc} - */ - override fun getName(): String { - return SCHEDULER_JOB_ACTION - } - - /** - * {@inheritDoc} - */ - override fun routes(): List { - return ImmutableList.of( - // update job status, release lock and remove job from queue. - // POST /_opendistro/reports_scheduler/job/{job_id} - RestHandler.Route(RestRequest.Method.POST, JOB_ID_URL), - // get triggered jobs from jobs queue. - // GET /_opendistro/reports_scheduler/job - RestHandler.Route(RestRequest.Method.GET, JOB_URL) - ) - } - - /** - * {@inheritDoc} - */ - override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val jobId = request.param(JOB_ID) - return RestChannelConsumer { channel: RestChannel -> - val handler = ReportsJobActionHandler(client, channel, clusterService) - when { - request.method() == RestRequest.Method.POST -> handler.updateJob(jobId) - request.method() == RestRequest.Method.GET -> handler.getJob() - else -> channel.sendResponse( - BytesRestResponse( - RestStatus.METHOD_NOT_ALLOWED, "${request.method()} is not allowed.")) - } - } - } -} diff --git a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsScheduleRestHandler.kt b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsScheduleRestHandler.kt deleted file mode 100644 index 44e51f8b..00000000 --- a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/resthandler/ReportsScheduleRestHandler.kt +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ -package com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler - -import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.BASE_SCHEDULER_URI -import com.amazon.opendistroforelasticsearch.reportsscheduler.action.ReportsScheduleActionHandler -import com.google.common.collect.ImmutableList -import org.elasticsearch.client.node.NodeClient -import org.elasticsearch.rest.BaseRestHandler -import org.elasticsearch.rest.BaseRestHandler.RestChannelConsumer -import org.elasticsearch.rest.BytesRestResponse -import org.elasticsearch.rest.RestChannel -import org.elasticsearch.rest.RestHandler -import org.elasticsearch.rest.RestRequest -import org.elasticsearch.rest.RestStatus - -/** - * Rest handler for scheduling reports. - * This handler [ReportsScheduleActionHandler] for scheduling. - */ -internal class ReportsScheduleRestHandler : BaseRestHandler() { - companion object { - private const val SCHEDULER_SCHEDULE_ACTION = "reports_scheduler_schedule_action" - private const val SCHEDULE = "schedule" - private const val JOB_ID = "job_id" - private const val SCHEDULE_URL = "$BASE_SCHEDULER_URI/$SCHEDULE" - } - - /** - * {@inheritDoc} - */ - override fun getName(): String { - return SCHEDULER_SCHEDULE_ACTION - } - - /** - * {@inheritDoc} - */ - override fun routes(): List { - return ImmutableList.of( - // create a scheduled job from report definition - // POST /_opendistro/reports_scheduler/schedule?job_id= - RestHandler.Route(RestRequest.Method.POST, SCHEDULE_URL), - // de-schedule a job - // DELETE /_opendistro/reports_scheduler/schedule?job_id= - RestHandler.Route(RestRequest.Method.DELETE, SCHEDULE_URL) - ) - } - - /** - * {@inheritDoc} - */ - override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - val jobId = request.param(JOB_ID) ?: throw IllegalArgumentException("Must specify id") - return RestChannelConsumer { channel: RestChannel -> - val handler = ReportsScheduleActionHandler(client, channel) - when { - request.method() == RestRequest.Method.POST -> handler.createSchedule(jobId, request) - request.method() == RestRequest.Method.DELETE -> handler.deleteSchedule(jobId) - else -> channel.sendResponse( - BytesRestResponse( - RestStatus.METHOD_NOT_ALLOWED, "${request.method()} is not allowed.")) - } - } - } -}