From 4b8fd80984d1d95741cc9957bc81c59585b95db5 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 24 May 2023 14:46:30 -0700 Subject: [PATCH] Remove jitter and move index setting from DatasourceFacade to DatasourceExtension (#319) Signed-off-by: Heemin Kim --- .../ip2geo/common/DatasourceFacade.java | 17 +++-------------- .../ip2geo/jobscheduler/Datasource.java | 18 ------------------ .../jobscheduler/DatasourceExtension.java | 16 ++++++++++++++++ .../ip2geo/jobscheduler/DatasourceTests.java | 9 --------- 4 files changed, 19 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index a14a9c72..166b025a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -12,9 +12,7 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -40,7 +38,6 @@ import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.bytes.BytesReference; -import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentFactory; @@ -62,9 +59,6 @@ @Log4j2 public class DatasourceFacade { private static final Integer MAX_SIZE = 1000; - private static final Tuple INDEX_SETTING_NUM_OF_SHARDS = new Tuple<>("index.number_of_shards", 1); - private static final Tuple INDEX_SETTING_AUTO_EXPAND_REPLICAS = new Tuple<>("index.auto_expand_replicas", "0-all"); - private static final Tuple INDEX_SETTING_HIDDEN = new Tuple<>("index.hidden", true); private final Client client; private final ClusterService clusterService; private final ClusterSettings clusterSettings; @@ -76,22 +70,17 @@ public DatasourceFacade(final Client client, final ClusterService clusterService } /** - * Create a datasource index of single shard with auto expand replicas to all nodes + * Create datasource index * - * We want the index to expand to all replica so that datasource query request can be executed locally - * for faster ingestion time. + * @param stepListener setp listener */ public void createIndexIfNotExists(final StepListener stepListener) { if (clusterService.state().metadata().hasIndex(DatasourceExtension.JOB_INDEX_NAME) == true) { stepListener.onResponse(null); return; } - final Map indexSettings = new HashMap<>(); - indexSettings.put(INDEX_SETTING_NUM_OF_SHARDS.v1(), INDEX_SETTING_NUM_OF_SHARDS.v2()); - indexSettings.put(INDEX_SETTING_AUTO_EXPAND_REPLICAS.v1(), INDEX_SETTING_AUTO_EXPAND_REPLICAS.v2()); - indexSettings.put(INDEX_SETTING_HIDDEN.v1(), INDEX_SETTING_HIDDEN.v2()); final CreateIndexRequest createIndexRequest = new CreateIndexRequest(DatasourceExtension.JOB_INDEX_NAME).mapping(getIndexMapping()) - .settings(indexSettings); + .settings(DatasourceExtension.INDEX_SETTING); StashedThreadContext.run(client, () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { @Override public void onResponse(final CreateIndexResponse createIndexResponse) { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index f27ab286..ac81e563 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -51,9 +51,6 @@ public class Datasource implements Writeable, ScheduledJobParameter { * Prefix of indices having Ip2Geo data */ public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data"; - private static final long MAX_JITTER_IN_MINUTES = 5; - private static final long ONE_DAY_IN_HOURS = 24; - private static final long ONE_HOUR_IN_MINUTES = 60; /** * Default fields for job scheduling @@ -285,21 +282,6 @@ public Long getLockDurationSeconds() { return Ip2GeoLockService.LOCK_DURATION_IN_SECONDS; } - /** - * Jitter in scheduling a task - * - * We want a job to be delayed randomly with range of (0, 5) minutes for the - * next execution time. - * - * @see ScheduledJobParameter#getJitter() - * - * @return the jitter - */ - @Override - public Double getJitter() { - return MAX_JITTER_IN_MINUTES / ((double) schedule.getInterval() * ONE_DAY_IN_HOURS * ONE_HOUR_IN_MINUTES); - } - /** * Enable auto update of GeoIP data */ diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java index edea50bd..1cf51338 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java @@ -5,6 +5,8 @@ package org.opensearch.geospatial.ip2geo.jobscheduler; +import java.util.Map; + import org.opensearch.jobscheduler.spi.JobSchedulerExtension; import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; @@ -21,6 +23,20 @@ public class DatasourceExtension implements JobSchedulerExtension { * Job index name for a datasource */ public static final String JOB_INDEX_NAME = ".scheduler_geospatial_ip2geo_datasource"; + /** + * Job index setting + * + * We want it to be single shard so that job can be run only in a single node by job scheduler. + * We want it to expand to all replicas so that querying to this index can be done locally to reduce latency. + */ + public static final Map INDEX_SETTING = Map.of( + "index.number_of_shards", + 1, + "index.auto_expand_replicas", + "0-all", + "index.hidden", + true + ); @Override public String getJobType() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java index 679a043d..b8be8068 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -15,7 +15,6 @@ import java.util.Arrays; import java.util.Locale; -import org.opensearch.common.Randomness; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -77,14 +76,6 @@ public void testGetIndexNameFor() { ); } - public void testGetJitter() { - Datasource datasource = new Datasource(); - datasource.setSchedule(new IntervalSchedule(Instant.now(), Randomness.get().ints(1, 31).findFirst().getAsInt(), ChronoUnit.DAYS)); - long intervalInMinutes = datasource.getSchedule().getInterval() * 60l * 24l; - double sixMinutes = 6; - assertTrue(datasource.getJitter() * intervalInMinutes <= sixMinutes); - } - public void testIsExpired() { Datasource datasource = new Datasource(); // never expire if validForInDays is null