From abf703d2c3ca440aa9220c68c763e25f78f6860c Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Thu, 29 Jun 2023 15:40:36 -0700 Subject: [PATCH] Few changes in query preference 1. Use _primary preference to get datasource metadata so that it can read the latest data. RefreshPolicy.IMMEDIATE won't refresh replica shards immediately according to #346 2. Set 0-all for auto_expand_replicas from the start so that geo data can be available in all data nodes quick enough after swithcing to the updated index 3. Update datasource metadata index mapping Signed-off-by: Heemin Kim --- .../geospatial/constants/QueryPreference.java | 14 ++ .../geospatial/ip2geo/dao/DatasourceDao.java | 9 +- .../geospatial/ip2geo/dao/GeoIpDataDao.java | 15 +- .../resources/mappings/ip2geo_datasource.json | 149 ++++++++++++------ .../ip2geo/dao/DatasourceDaoTests.java | 5 + .../ip2geo/dao/GeoIpDataDaoTests.java | 10 +- 6 files changed, 138 insertions(+), 64 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/constants/QueryPreference.java diff --git a/src/main/java/org/opensearch/geospatial/constants/QueryPreference.java b/src/main/java/org/opensearch/geospatial/constants/QueryPreference.java new file mode 100644 index 00000000..8a3302c0 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/constants/QueryPreference.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.constants; + +/** + * Collection of query preference + */ +public class QueryPreference { + public static final String PRIMARY = "_primary"; + public static final String LOCAL = "_local"; +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java index ec8e6f81..f7c5d98a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java @@ -47,6 +47,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; @@ -156,6 +157,7 @@ private IndexRequest toIndexRequest(Datasource datasource) { indexRequest.index(DatasourceExtension.JOB_INDEX_NAME); indexRequest.id(datasource.getName()); indexRequest.opType(DocWriteRequest.OpType.INDEX); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); return indexRequest; } catch (IOException e) { @@ -215,7 +217,7 @@ public void deleteDatasource(final Datasource datasource) { * @throws IOException exception */ public Datasource getDatasource(final String name) throws IOException { - GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(QueryPreference.PRIMARY); GetResponse response; try { response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))); @@ -242,7 +244,7 @@ public Datasource getDatasource(final String name) throws IOException { * @param actionListener the action listener */ public void getDatasource(final String name, final ActionListener actionListener) { - GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(QueryPreference.PRIMARY); StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() { @Override public void onResponse(final GetResponse response) { @@ -280,6 +282,7 @@ public void getDatasources(final String[] names, final ActionListener client.prepareMultiGet() .add(DatasourceExtension.JOB_INDEX_NAME, names) + .setPreference(QueryPreference.PRIMARY) .execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener)) ); } @@ -293,6 +296,7 @@ public void getAllDatasources(final ActionListener> actionListe client, () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(QueryPreference.PRIMARY) .setSize(MAX_SIZE) .execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener)) ); @@ -306,6 +310,7 @@ public List getAllDatasources() { client, () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(QueryPreference.PRIMARY) .setSize(MAX_SIZE) .execute() .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java index a930781c..f15c0a04 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java @@ -57,6 +57,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.constants.IndexSetting; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.shared.Constants; @@ -74,19 +75,14 @@ public class GeoIpDataDao { private static final Map INDEX_SETTING_TO_CREATE = Map.of( IndexSetting.NUMBER_OF_SHARDS, 1, - IndexSetting.NUMBER_OF_REPLICAS, - 0, + IndexSetting.AUTO_EXPAND_REPLICAS, + "0-all", IndexSetting.REFRESH_INTERVAL, -1, IndexSetting.HIDDEN, true ); - private static final Map INDEX_SETTING_TO_FREEZE = Map.of( - IndexSetting.AUTO_EXPAND_REPLICAS, - "0-all", - IndexSetting.BLOCKS_WRITE, - true - ); + private static final Map INDEX_SETTING_TO_FREEZE = Map.of(IndexSetting.BLOCKS_WRITE, true); private final ClusterService clusterService; private final ClusterSettings clusterSettings; private final Client client; @@ -131,6 +127,7 @@ private void freezeIndex(final String indexName) { .execute() .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)); }); + } /** @@ -248,7 +245,7 @@ public Map getGeoIpData(final String indexName, final String ip) () -> client.prepareSearch(indexName) .setSize(1) .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) - .setPreference("_local") + .setPreference(QueryPreference.LOCAL) .setRequestCache(true) .get(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) ); diff --git a/src/main/resources/mappings/ip2geo_datasource.json b/src/main/resources/mappings/ip2geo_datasource.json index 3f3d5aa1..567052d6 100644 --- a/src/main/resources/mappings/ip2geo_datasource.json +++ b/src/main/resources/mappings/ip2geo_datasource.json @@ -1,75 +1,130 @@ { - "properties" : { - "database" : { - "properties" : { - "fields" : { - "type" : "text" + "properties": { + "database": { + "properties": { + "fields": { + "type": "text" }, - "sha256_hash" : { - "type" : "text" + "provider": { + "type": "text" }, - "provider" : { - "type" : "text" + "sha256_hash": { + "type": "text" }, - "updated_at_in_epoch_millis" : { - "type" : "long" + "updated_at_in_epoch_millis": { + "type": "long" }, - "valid_for_in_days" : { - "type" : "long" + "valid_for_in_days": { + "type": "long" } } }, - "enabled_time" : { - "type" : "long" + "enabled_time": { + "type": "long" }, - "endpoint" : { - "type" : "text" + "endpoint": { + "type": "text" }, - "name" : { - "type" : "text" + "indices": { + "type": "text" }, - "indices" : { - "type" : "text" + "last_update_time": { + "type": "long" }, - "last_update_time" : { - "type" : "long" + "name": { + "type": "text" }, - "schedule" : { - "properties" : { - "interval" : { - "properties" : { - "period" : { - "type" : "long" + "schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" }, - "start_time" : { - "type" : "long" + "start_time": { + "type": "long" }, - "unit" : { - "type" : "text" + "unit": { + "type": "text" } } } } }, - "state" : { - "type" : "text" + "state": { + "type": "text" }, - "update_enabled" : { - "type" : "boolean" + "system_schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" + }, + "start_time": { + "type": "long" + }, + "unit": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "task": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } }, - "update_stats" : { - "properties" : { - "last_failed_at_in_epoch_millis" : { - "type" : "long" + "update_enabled": { + "type": "boolean" + }, + "update_stats": { + "properties": { + "last_failed_at_in_epoch_millis": { + "type": "long" }, - "last_processing_time_in_millis" : { - "type" : "long" + "last_processing_time_in_millis": { + "type": "long" }, - "last_skipped_at_in_epoch_millis" : { - "type" : "long" + "last_skipped_at_in_epoch_millis": { + "type": "long" }, - "last_succeeded_at_in_epoch_millis" : { - "type" : "long" + "last_succeeded_at_in_epoch_millis": { + "type": "long" + } + } + }, + "user_schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" + }, + "start_time": { + "type": "long" + }, + "unit": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java index 09e2dd46..1c935b57 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java @@ -43,6 +43,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; @@ -205,6 +206,7 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime GetRequest request = (GetRequest) actionRequest; assertEquals(datasource.getName(), request.id()); assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(QueryPreference.PRIMARY, request.preference()); GetResponse response = getMockedGetResponse(isExist ? datasource : null); if (exception != null) { throw exception; @@ -262,6 +264,7 @@ public void testGetDatasources_whenValidInput_thenSucceed() { assertTrue(actionRequest instanceof MultiGetRequest); MultiGetRequest request = (MultiGetRequest) actionRequest; assertEquals(2, request.getItems().size()); + assertEquals(QueryPreference.PRIMARY, request.preference()); for (MultiGetRequest.Item item : request.getItems()) { assertEquals(DatasourceExtension.JOB_INDEX_NAME, item.index()); assertTrue(datasources.stream().filter(datasource -> datasource.getName().equals(item.id())).findAny().isPresent()); @@ -295,6 +298,7 @@ public void testGetAllDatasources_whenAsynchronous_thenSucceed() { assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); assertEquals(1000, request.source().size()); + assertEquals(QueryPreference.PRIMARY, request.preference()); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); @@ -322,6 +326,7 @@ public void testGetAllDatasources_whenSynchronous_thenSucceed() { assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); assertEquals(1000, request.source().size()); + assertEquals(QueryPreference.PRIMARY, request.preference()); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java index 45380aa3..812befb0 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java @@ -45,6 +45,7 @@ import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesReference; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.shared.Constants; @@ -80,8 +81,7 @@ public void testCreateIndexIfNotExistsWithoutExistingIndex() { CreateIndexRequest request = (CreateIndexRequest) actionRequest; assertEquals(index, request.index()); assertEquals(1, (int) request.settings().getAsInt("index.number_of_shards", 0)); - assertNull(request.settings().get("index.auto_expand_replicas")); - assertEquals(0, (int) request.settings().getAsInt("index.number_of_replicas", 1)); + assertEquals("0-all", request.settings().get("index.auto_expand_replicas")); assertEquals(-1, (int) request.settings().getAsInt("index.refresh_interval", 0)); assertEquals(true, request.settings().getAsBoolean("index.hidden", false)); @@ -214,8 +214,6 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { assertEquals(1, request.indices().length); assertEquals(index, request.indices()[0]); assertEquals(true, request.settings().getAsBoolean("index.blocks.write", false)); - assertNull(request.settings().get("index.num_of_replica")); - assertEquals("0-all", request.settings().get("index.auto_expand_replicas")); return null; } else { throw new RuntimeException("invalid request is called"); @@ -236,7 +234,7 @@ public void testGetGeoIpData_whenDataExist_thenReturnTheData() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof SearchRequest; SearchRequest request = (SearchRequest) actionRequest; - assertEquals("_local", request.preference()); + assertEquals(QueryPreference.LOCAL, request.preference()); assertEquals(1, request.source().size()); assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query()); @@ -269,7 +267,7 @@ public void testGetGeoIpData_whenNoData_thenReturnEmpty() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof SearchRequest; SearchRequest request = (SearchRequest) actionRequest; - assertEquals("_local", request.preference()); + assertEquals(QueryPreference.LOCAL, request.preference()); assertEquals(1, request.source().size()); assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());