From 535fb023f843fcd4fa683b09f85b64bfcefb6d5f Mon Sep 17 00:00:00 2001 From: Anantha Krishna Bhatta Date: Wed, 14 Oct 2020 13:11:37 -0700 Subject: [PATCH] Added settings/configuration support to plugin --- .../src/main/config/reports-scheduler.yml | 26 ++ .../job/parameter/JobConstant.java | 1 - .../ReportsSchedulerPlugin.kt | 12 +- .../settings/PluginSettings.kt | 313 ++++++++++++++++++ 4 files changed, 350 insertions(+), 2 deletions(-) create mode 100644 reports-scheduler/src/main/config/reports-scheduler.yml create mode 100644 reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/settings/PluginSettings.kt diff --git a/reports-scheduler/src/main/config/reports-scheduler.yml b/reports-scheduler/src/main/config/reports-scheduler.yml new file mode 100644 index 00000000..3419587c --- /dev/null +++ b/reports-scheduler/src/main/config/reports-scheduler.yml @@ -0,0 +1,26 @@ +--- +## +# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. +# +## + +# configuration file for the reports scheduler plugin +opendistro.reports.scheduler: + general: + operationTimeoutMs: 60000 # 60 seconds, Minimum 100ms + polling: + jobLockDurationSeconds: 300 # 5 Minutes, Minimum 10 seconds + minPollingDurationSeconds: 300 # 5 Minutes, Minimum 60 seconds + maxPollingDurationSeconds: 900 # 15 Minutes, Minimum 5 Minutes + maxLockRetries: 1 # Max number of retries to retry locking diff --git a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java index 21d6a74f..26831b8d 100644 --- a/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java +++ b/reports-scheduler/src/main/java/com/amazon/opendistroforelasticsearch/reportsscheduler/job/parameter/JobConstant.java @@ -22,5 +22,4 @@ public class JobConstant { public static final String SCHEDULE_FIELD = "schedule"; public static final String ENABLED_TIME_FILED = "enabled_time"; public static final String REPORT_DEFINITION_ID = "report_definition_id"; - public static final String LOCK_DURATION_SECONDS = "lock_duration_seconds"; } diff --git a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt index ba3b7584..6ba006b0 100644 --- a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt +++ b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/ReportsSchedulerPlugin.kt @@ -22,6 +22,7 @@ import com.amazon.opendistroforelasticsearch.reportsscheduler.job.ReportsSchedul import com.amazon.opendistroforelasticsearch.reportsscheduler.job.ScheduledReportJobParser import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportsJobRestHandler import com.amazon.opendistroforelasticsearch.reportsscheduler.resthandler.ReportsScheduleRestHandler +import com.amazon.opendistroforelasticsearch.reportsscheduler.settings.PluginSettings import com.google.common.collect.ImmutableList import org.elasticsearch.client.Client import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver @@ -30,6 +31,7 @@ import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.io.stream.NamedWriteableRegistry import org.elasticsearch.common.settings.ClusterSettings import org.elasticsearch.common.settings.IndexScopedSettings +import org.elasticsearch.common.settings.Setting import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.settings.SettingsFilter import org.elasticsearch.common.xcontent.NamedXContentRegistry @@ -62,6 +64,13 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension { private val jobRunner = ReportsSchedulerJobRunnerProxy.getJobRunnerInstance() private lateinit var clusterService: ClusterService // initialized in createComponents() + /** + * {@inheritDoc} + */ + override fun getSettings(): List> { + return PluginSettings.getAllSettings() + } + /** * {@inheritDoc} */ @@ -78,8 +87,9 @@ class ReportsSchedulerPlugin : Plugin(), ActionPlugin, JobSchedulerExtension { indexNameExpressionResolver: IndexNameExpressionResolver, repositoriesServiceSupplier: Supplier ): Collection { - jobRunner.createRunnerInstance(clusterService, threadPool, client) this.clusterService = clusterService + PluginSettings.addSettingsUpdateConsumer(clusterService) + jobRunner.createRunnerInstance(clusterService, threadPool, client) return emptyList() } diff --git a/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/settings/PluginSettings.kt b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/settings/PluginSettings.kt new file mode 100644 index 00000000..a20891f5 --- /dev/null +++ b/reports-scheduler/src/main/kotlin/com/amazon/opendistroforelasticsearch/reportsscheduler/settings/PluginSettings.kt @@ -0,0 +1,313 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.opendistroforelasticsearch.reportsscheduler.settings + +import com.amazon.opendistroforelasticsearch.reportsscheduler.ReportsSchedulerPlugin.Companion.PLUGIN_NAME +import org.apache.logging.log4j.LogManager +import org.elasticsearch.bootstrap.BootstrapInfo +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.settings.Setting +import org.elasticsearch.common.settings.Setting.Property.Dynamic +import org.elasticsearch.common.settings.Setting.Property.NodeScope +import org.elasticsearch.common.settings.Settings +import java.io.IOException +import java.nio.file.Path + +/** + * settings specific to reports scheduler Plugin. + */ +internal object PluginSettings { + + /** + * Settings Key prefix for this plugin. + */ + private const val KEY_PREFIX = "opendistro.reports.scheduler" + + /** + * General settings Key prefix. + */ + private const val GENERAL_KEY_PREFIX = "$KEY_PREFIX.general" + + /** + * Polling settings Key prefix. + */ + private const val POLLING_KEY_PREFIX = "$KEY_PREFIX.polling" + + /** + * Operation timeout for network operations. + */ + private const val OPERATION_TIMEOUT_MS_KEY = "$GENERAL_KEY_PREFIX.operationTimeoutMs" + + /** + * Setting to choose Job lock duration. + */ + private const val JOB_LOCK_DURATION_S_KEY = "$POLLING_KEY_PREFIX.jobLockDurationSeconds" + + /** + * Setting to choose Minimum polling duration. + */ + private const val MIN_POLLING_DURATION_S_KEY = "$POLLING_KEY_PREFIX.minPollingDurationSeconds" + + /** + * Setting to choose Maximum polling duration. + */ + private const val MAX_POLLING_DURATION_S_KEY = "$POLLING_KEY_PREFIX.maxPollingDurationSeconds" + + /** + * Setting to choose Maximum number of retries to try locking. + */ + private const val MAX_LOCK_RETRIES_KEY = "$POLLING_KEY_PREFIX.maxLockRetries" + + /** + * Default operation timeout for network operations. + */ + private const val DEFAULT_OPERATION_TIMEOUT_MS = 60000 + + /** + * Minimum operation timeout for network operations. + */ + private const val MINIMUM_OPERATION_TIMEOUT_MS = 100 + + /** + * Default Job lock duration. + */ + private const val DEFAULT_JOB_LOCK_DURATION_S = 300 + + /** + * Minimum Job lock duration. + */ + private const val MINIMUM_JOB_LOCK_DURATION_S = 10 + + /** + * Default Minimum polling duration. + */ + private const val DEFAULT_MIN_POLLING_DURATION_S = 300 + + /** + * Minimum Min polling duration. + */ + private const val MINIMUM_MIN_POLLING_DURATION_S = 60 + + /** + * Default Maximum polling duration. + */ + private const val DEFAULT_MAX_POLLING_DURATION_S = 900 + + /** + * Minimum Maximum polling duration. + */ + private const val MINIMUM_MAX_POLLING_DURATION_S = 300 + + /** + * Default number of retries to try locking. + */ + private const val DEFAULT_MAX_LOCK_RETRIES = 0 + + /** + * Minimum number of retries to try locking. + */ + private const val MINIMUM_LOCK_RETRIES = 0 + + /** + * Operation timeout setting in ms for I/O operations + */ + @Volatile + var operationTimeoutMs: Int + + /** + * Job lock duration + */ + @Volatile + var jobLockDurationSeconds: Int + + /** + * Minimum polling duration + */ + @Volatile + var minPollingDurationSeconds: Int + + /** + * Maximum polling duration. + */ + @Volatile + var maxPollingDurationSeconds: Int + + /** + * Max number of retries to try locking. + */ + @Volatile + var maxLockRetries: Int + + private const val DECIMAL_RADIX: Int = 10 + + private val log = LogManager.getLogger(javaClass) + private val defaultSettings: Map + + init { + var settings: Settings? = null + val configDirName = BootstrapInfo.getSystemProperties()?.get("es.path.conf")?.toString() + if (configDirName != null) { + val defaultSettingYmlFile = Path.of(configDirName, PLUGIN_NAME, "reports-scheduler.yml") + try { + settings = Settings.builder().loadFromPath(defaultSettingYmlFile).build() + } catch (exception: IOException) { + log.warn("$PLUGIN_NAME:Failed to load ${defaultSettingYmlFile.toAbsolutePath()}") + } + } + // Initialize the settings values to default values + operationTimeoutMs = (settings?.get(OPERATION_TIMEOUT_MS_KEY)?.toInt()) ?: DEFAULT_OPERATION_TIMEOUT_MS + jobLockDurationSeconds = (settings?.get(JOB_LOCK_DURATION_S_KEY)?.toInt()) ?: DEFAULT_JOB_LOCK_DURATION_S + minPollingDurationSeconds = (settings?.get(MIN_POLLING_DURATION_S_KEY)?.toInt()) + ?: DEFAULT_MIN_POLLING_DURATION_S + maxPollingDurationSeconds = (settings?.get(MAX_POLLING_DURATION_S_KEY)?.toInt()) + ?: DEFAULT_MAX_POLLING_DURATION_S + maxLockRetries = (settings?.get(MAX_LOCK_RETRIES_KEY)?.toInt()) ?: DEFAULT_MAX_LOCK_RETRIES + + defaultSettings = mapOf( + OPERATION_TIMEOUT_MS_KEY to operationTimeoutMs.toString(DECIMAL_RADIX), + JOB_LOCK_DURATION_S_KEY to jobLockDurationSeconds.toString(DECIMAL_RADIX), + MIN_POLLING_DURATION_S_KEY to minPollingDurationSeconds.toString(DECIMAL_RADIX), + MAX_POLLING_DURATION_S_KEY to maxPollingDurationSeconds.toString(DECIMAL_RADIX), + MAX_LOCK_RETRIES_KEY to maxLockRetries.toString(DECIMAL_RADIX) + ) + } + + private val OPERATION_TIMEOUT_MS: Setting = Setting.intSetting( + OPERATION_TIMEOUT_MS_KEY, + defaultSettings[OPERATION_TIMEOUT_MS_KEY]!!.toInt(), + MINIMUM_OPERATION_TIMEOUT_MS, + NodeScope, Dynamic + ) + + private val JOB_LOCK_DURATION_S: Setting = Setting.intSetting( + JOB_LOCK_DURATION_S_KEY, + defaultSettings[JOB_LOCK_DURATION_S_KEY]!!.toInt(), + MINIMUM_JOB_LOCK_DURATION_S, + NodeScope, Dynamic + ) + + private val MIN_POLLING_DURATION_S: Setting = Setting.intSetting( + MIN_POLLING_DURATION_S_KEY, + defaultSettings[MIN_POLLING_DURATION_S_KEY]!!.toInt(), + MINIMUM_MIN_POLLING_DURATION_S, + NodeScope, Dynamic + ) + + private val MAX_POLLING_DURATION_S: Setting = Setting.intSetting( + MAX_POLLING_DURATION_S_KEY, + defaultSettings[MAX_POLLING_DURATION_S_KEY]!!.toInt(), + MINIMUM_MAX_POLLING_DURATION_S, + NodeScope, Dynamic + ) + + private val MAX_LOCK_RETRIES: Setting = Setting.intSetting( + MAX_LOCK_RETRIES_KEY, + defaultSettings[MAX_LOCK_RETRIES_KEY]!!.toInt(), + MINIMUM_LOCK_RETRIES, + NodeScope, Dynamic + ) + + /** + * Returns list of additional settings available specific to this plugin. + * + * @return list of settings defined in this plugin + */ + fun getAllSettings(): List> { + return listOf(OPERATION_TIMEOUT_MS, + JOB_LOCK_DURATION_S, + MIN_POLLING_DURATION_S, + MAX_POLLING_DURATION_S, + MAX_LOCK_RETRIES + ) + } + + /** + * Update the setting variables to setting values from local settings + * @param clusterService cluster service instance + */ + private fun updateSettingValuesFromLocal(clusterService: ClusterService) { + operationTimeoutMs = OPERATION_TIMEOUT_MS.get(clusterService.settings) + jobLockDurationSeconds = JOB_LOCK_DURATION_S.get(clusterService.settings) + minPollingDurationSeconds = MIN_POLLING_DURATION_S.get(clusterService.settings) + maxPollingDurationSeconds = MAX_POLLING_DURATION_S.get(clusterService.settings) + maxLockRetries = MAX_LOCK_RETRIES.get(clusterService.settings) + } + + /** + * Update the setting variables to setting values from cluster settings + * @param clusterService cluster service instance + */ + private fun updateSettingValuesFromCluster(clusterService: ClusterService) { + val clusterOperationTimeoutMs = clusterService.clusterSettings.get(OPERATION_TIMEOUT_MS) + if (clusterOperationTimeoutMs != null) { + log.debug("$PLUGIN_NAME:$OPERATION_TIMEOUT_MS_KEY -autoUpdatedTo-> $clusterOperationTimeoutMs") + operationTimeoutMs = clusterOperationTimeoutMs + } + val clusterJobLockDurationSeconds = clusterService.clusterSettings.get(JOB_LOCK_DURATION_S) + if (clusterJobLockDurationSeconds != null) { + log.debug("$PLUGIN_NAME:$JOB_LOCK_DURATION_S_KEY -autoUpdatedTo-> $clusterJobLockDurationSeconds") + jobLockDurationSeconds = clusterJobLockDurationSeconds + } + val clusterMinPollingDurationSeconds = clusterService.clusterSettings.get(MIN_POLLING_DURATION_S) + if (clusterMinPollingDurationSeconds != null) { + log.debug("$PLUGIN_NAME:$MIN_POLLING_DURATION_S_KEY -autoUpdatedTo-> $clusterMinPollingDurationSeconds") + minPollingDurationSeconds = clusterMinPollingDurationSeconds + } + val clusterMaxPollingDurationSeconds = clusterService.clusterSettings.get(MAX_POLLING_DURATION_S) + if (clusterMaxPollingDurationSeconds != null) { + log.debug("$PLUGIN_NAME:$MAX_POLLING_DURATION_S_KEY -autoUpdatedTo-> $clusterMaxPollingDurationSeconds") + maxPollingDurationSeconds = clusterMaxPollingDurationSeconds + } + val clusterMaxLockRetries = clusterService.clusterSettings.get(MAX_LOCK_RETRIES) + if (clusterMaxLockRetries != null) { + log.debug("$PLUGIN_NAME:$MAX_LOCK_RETRIES_KEY -autoUpdatedTo-> $clusterMaxLockRetries") + maxLockRetries = clusterMaxLockRetries + } + } + + /** + * adds Settings update listeners to all settings. + * @param clusterService cluster service instance + */ + fun addSettingsUpdateConsumer(clusterService: ClusterService) { + updateSettingValuesFromLocal(clusterService) + // Update the variables to cluster setting values + // If the cluster is not yet started then we get default values again + updateSettingValuesFromCluster(clusterService) + + clusterService.clusterSettings.addSettingsUpdateConsumer(OPERATION_TIMEOUT_MS) { + operationTimeoutMs = it + log.info("$PLUGIN_NAME:$OPERATION_TIMEOUT_MS_KEY -updatedTo-> $it") + } + clusterService.clusterSettings.addSettingsUpdateConsumer(JOB_LOCK_DURATION_S) { + jobLockDurationSeconds = it + log.info("$PLUGIN_NAME:$JOB_LOCK_DURATION_S_KEY -updatedTo-> $it") + } + clusterService.clusterSettings.addSettingsUpdateConsumer(MIN_POLLING_DURATION_S) { + minPollingDurationSeconds = it + log.info("$PLUGIN_NAME:$MIN_POLLING_DURATION_S_KEY -updatedTo-> $it") + } + clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_POLLING_DURATION_S) { + maxPollingDurationSeconds = it + log.info("$PLUGIN_NAME:$MAX_POLLING_DURATION_S_KEY -updatedTo-> $it") + } + clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_LOCK_RETRIES) { + maxLockRetries = it + log.info("$PLUGIN_NAME:$MAX_LOCK_RETRIES_KEY -updatedTo-> $it") + } + } +}