Skip to content

Commit

Permalink
Upgrade settings from OpenDistro to OpenSearch.
Browse files Browse the repository at this point in the history
Signed-off-by: dblock <[email protected]>
  • Loading branch information
dblock committed May 4, 2021
1 parent b500313 commit ba438c6
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.SettingUpgrader;
import org.opensearch.common.settings.GenericSettingUpgrader;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.NamedXContentRegistry;
Expand All @@ -67,7 +69,6 @@
import java.util.Set;
import java.util.function.Supplier;


public class JobSchedulerPlugin extends Plugin implements ExtensiblePlugin {

public static final String OPEN_DISTRO_JOB_SCHEDULER_THREAD_POOL_NAME = "open_distro_job_scheduler";
Expand Down Expand Up @@ -111,9 +112,35 @@ public List<Setting<?>> getSettings() {
settingList.add(JobSchedulerSettings.SWEEP_BACKOFF_RETRY_COUNT);
settingList.add(JobSchedulerSettings.SWEEP_PERIOD);
settingList.add(JobSchedulerSettings.JITTER_LIMIT);
// legacy OpenDistro settings
settingList.add(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_PAGE_SIZE);
settingList.add(JobSchedulerSettings.LEGACY_OPENDISTRO_REQUEST_TIMEOUT);
settingList.add(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_BACKOFF_MILLIS);
settingList.add(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_BACKOFF_RETRY_COUNT);
settingList.add(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_PERIOD);
settingList.add(JobSchedulerSettings.LEGACY_OPENDISTRO_JITTER_LIMIT);
return settingList;
}



public List<SettingUpgrader<?>> getSettingUpgraders() {
List<SettingUpgrader<?>> settingUpgraders = new ArrayList<>();
settingUpgraders.add(new GenericSettingUpgrader<>(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_PAGE_SIZE,
JobSchedulerSettings.SWEEP_PAGE_SIZE));
settingUpgraders.add(new GenericSettingUpgrader<>(JobSchedulerSettings.LEGACY_OPENDISTRO_REQUEST_TIMEOUT,
JobSchedulerSettings.REQUEST_TIMEOUT));
settingUpgraders.add(new GenericSettingUpgrader<>(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_BACKOFF_MILLIS,
JobSchedulerSettings.SWEEP_BACKOFF_MILLIS));
settingUpgraders.add(new GenericSettingUpgrader<>(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_BACKOFF_RETRY_COUNT,
JobSchedulerSettings.SWEEP_BACKOFF_RETRY_COUNT));
settingUpgraders.add(new GenericSettingUpgrader<>(JobSchedulerSettings.LEGACY_OPENDISTRO_SWEEP_PERIOD,
JobSchedulerSettings.SWEEP_PERIOD));
settingUpgraders.add(new GenericSettingUpgrader<>(JobSchedulerSettings.LEGACY_OPENDISTRO_JITTER_LIMIT,
JobSchedulerSettings.JITTER_LIMIT));
return settingUpgraders;
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
final int processorCount = OpenSearchExecutors.allocatedProcessors(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,52 @@

public class JobSchedulerSettings {
public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"opendistro.jobscheduler.request_timeout",
"opensearch.jobscheduler.request_timeout",
TimeValue.timeValueSeconds(10),
Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<TimeValue> SWEEP_BACKOFF_MILLIS = Setting.positiveTimeSetting(
"opendistro.jobscheduler.sweeper.backoff_millis",
"opensearch.jobscheduler.sweeper.backoff_millis",
TimeValue.timeValueMillis(50),
Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> SWEEP_BACKOFF_RETRY_COUNT = Setting.intSetting(
"opendistro.jobscheduler.retry_count",
3,
"opensearch.jobscheduler.retry_count",
3,
Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<TimeValue> SWEEP_PERIOD = Setting.positiveTimeSetting(
"opendistro.jobscheduler.sweeper.period",
"opensearch.jobscheduler.sweeper.period",
TimeValue.timeValueMinutes(5),
Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Integer> SWEEP_PAGE_SIZE = Setting.intSetting(
"opendistro.jobscheduler.sweeper.page_size",
100,
"opensearch.jobscheduler.sweeper.page_size",
100,
Setting.Property.NodeScope, Setting.Property.Dynamic);

public static final Setting<Double> JITTER_LIMIT = Setting.doubleSetting(
"opendistro.jobscheduler.jitter_limit",
0.60, 0, 0.95,
"opensearch.jobscheduler.jitter_limit",
0.60, 0, 0.95,
Setting.Property.NodeScope, Setting.Property.Dynamic);

// legacy settings from OpenDistro

public static final Setting<TimeValue> LEGACY_OPENDISTRO_REQUEST_TIMEOUT = REQUEST_TIMEOUT.withKey(
"opendistro.jobscheduler.request_timeout");

public static final Setting<TimeValue> LEGACY_OPENDISTRO_SWEEP_BACKOFF_MILLIS = SWEEP_BACKOFF_MILLIS.withKey(
"opendistro.jobscheduler.sweeper.backoff_millis");

public static final Setting<Integer> LEGACY_OPENDISTRO_SWEEP_BACKOFF_RETRY_COUNT = SWEEP_BACKOFF_RETRY_COUNT.withKey(
"opendistro.jobscheduler.retry_count");

public static final Setting<TimeValue> LEGACY_OPENDISTRO_SWEEP_PERIOD = SWEEP_PERIOD.withKey(
"opendistro.jobscheduler.sweeper.period");

public static final Setting<Integer> LEGACY_OPENDISTRO_SWEEP_PAGE_SIZE = SWEEP_PAGE_SIZE.withKey(
"opendistro.jobscheduler.sweeper.page_size");

public static final Setting<Double> LEGACY_OPENDISTRO_JITTER_LIMIT = JITTER_LIMIT.withKey(
"opendistro.jobscheduler.jitter_limit");
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,36 @@ private void addConfigListeners() {
clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_PERIOD,
timeValue -> {
sweepPeriod = timeValue;
log.debug("Reinitializing background full sweep with period: " + sweepPeriod.getMinutes());
log.debug("Reinitializing background full sweep with period: " + this.sweepPeriod.getMinutes());
initBackgroundSweep();
});
clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_PAGE_SIZE,
intValue -> sweepPageMaxSize = intValue);
intValue -> {
sweepPageMaxSize = intValue;
log.debug("Setting background sweep page size: " + this.sweepPageMaxSize);
});
clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.REQUEST_TIMEOUT,
timeValue -> this.sweepSearchTimeout = timeValue);
timeValue -> {
this.sweepSearchTimeout = timeValue;
log.debug("Setting background sweep search timeout: " + this.sweepSearchTimeout.getMinutes());
});
clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_BACKOFF_MILLIS,
timeValue -> {
this.sweepSearchBackoffMillis = timeValue;
this.sweepSearchBackoff = this.updateRetryPolicy();
log.debug("Setting background sweep search backoff: " + this.sweepSearchBackoffMillis.getMillis());
});
clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.SWEEP_BACKOFF_RETRY_COUNT,
intValue -> {
this.sweepSearchBackoffRetryCount = intValue;
this.sweepSearchBackoff = this.updateRetryPolicy();
log.debug("Setting background sweep search backoff retry count: " + this.sweepSearchBackoffRetryCount);
});
clusterService.getClusterSettings().addSettingsUpdateConsumer(JobSchedulerSettings.JITTER_LIMIT,
doubleValue -> this.jitterLimit = doubleValue);
doubleValue -> {
this.jitterLimit = doubleValue;
log.debug("Setting background sweep jitter limit: " + this.jitterLimit);
});
}

private BackoffPolicy updateRetryPolicy() {
Expand Down

0 comments on commit ba438c6

Please sign in to comment.