diff --git a/src/main/java/org/opensearch/geospatial/constants/QueryPreference.java b/src/main/java/org/opensearch/geospatial/constants/QueryPreference.java new file mode 100644 index 00000000..8a3302c0 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/constants/QueryPreference.java @@ -0,0 +1,14 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.constants; + +/** + * Collection of query preference + */ +public class QueryPreference { + public static final String PRIMARY = "_primary"; + public static final String LOCAL = "_local"; +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java index ec8e6f81..f7c5d98a 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDao.java @@ -47,6 +47,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; @@ -156,6 +157,7 @@ private IndexRequest toIndexRequest(Datasource datasource) { indexRequest.index(DatasourceExtension.JOB_INDEX_NAME); indexRequest.id(datasource.getName()); indexRequest.opType(DocWriteRequest.OpType.INDEX); + indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); return indexRequest; } catch (IOException e) { @@ -215,7 +217,7 @@ public void deleteDatasource(final Datasource datasource) { * @throws IOException exception */ public Datasource getDatasource(final String name) throws IOException { - GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(QueryPreference.PRIMARY); GetResponse response; try { response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))); @@ -242,7 +244,7 @@ public Datasource getDatasource(final String name) throws IOException { * @param actionListener the action listener */ public void getDatasource(final String name, final ActionListener actionListener) { - GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name); + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, name).preference(QueryPreference.PRIMARY); StashedThreadContext.run(client, () -> client.get(request, new ActionListener<>() { @Override public void onResponse(final GetResponse response) { @@ -280,6 +282,7 @@ public void getDatasources(final String[] names, final ActionListener client.prepareMultiGet() .add(DatasourceExtension.JOB_INDEX_NAME, names) + .setPreference(QueryPreference.PRIMARY) .execute(createGetDataSourceQueryActionLister(MultiGetResponse.class, actionListener)) ); } @@ -293,6 +296,7 @@ public void getAllDatasources(final ActionListener> actionListe client, () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(QueryPreference.PRIMARY) .setSize(MAX_SIZE) .execute(createGetDataSourceQueryActionLister(SearchResponse.class, actionListener)) ); @@ -306,6 +310,7 @@ public List getAllDatasources() { client, () -> client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME) .setQuery(QueryBuilders.matchAllQuery()) + .setPreference(QueryPreference.PRIMARY) .setSize(MAX_SIZE) .execute() .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java index a930781c..4cc032bc 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java @@ -57,6 +57,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.constants.IndexSetting; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.shared.Constants; @@ -248,7 +249,7 @@ public Map getGeoIpData(final String indexName, final String ip) () -> client.prepareSearch(indexName) .setSize(1) .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) - .setPreference("_local") + .setPreference(QueryPreference.LOCAL) .setRequestCache(true) .get(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) ); diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index f14dd1f1..f2e509b1 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -85,6 +85,7 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable } geoIpDataDao.putGeoIpData(indexName, header, reader.iterator(), renewLock); } + geoIpDataDao.waitForAllShardsToBeStarted(indexName, renewLock); Instant endTime = Instant.now(); updateDatasourceAsSucceeded(indexName, datasource, manifest, fieldsToStore, startTime, endTime); diff --git a/src/main/resources/mappings/ip2geo_datasource.json b/src/main/resources/mappings/ip2geo_datasource.json index 3f3d5aa1..567052d6 100644 --- a/src/main/resources/mappings/ip2geo_datasource.json +++ b/src/main/resources/mappings/ip2geo_datasource.json @@ -1,75 +1,130 @@ { - "properties" : { - "database" : { - "properties" : { - "fields" : { - "type" : "text" + "properties": { + "database": { + "properties": { + "fields": { + "type": "text" }, - "sha256_hash" : { - "type" : "text" + "provider": { + "type": "text" }, - "provider" : { - "type" : "text" + "sha256_hash": { + "type": "text" }, - "updated_at_in_epoch_millis" : { - "type" : "long" + "updated_at_in_epoch_millis": { + "type": "long" }, - "valid_for_in_days" : { - "type" : "long" + "valid_for_in_days": { + "type": "long" } } }, - "enabled_time" : { - "type" : "long" + "enabled_time": { + "type": "long" }, - "endpoint" : { - "type" : "text" + "endpoint": { + "type": "text" }, - "name" : { - "type" : "text" + "indices": { + "type": "text" }, - "indices" : { - "type" : "text" + "last_update_time": { + "type": "long" }, - "last_update_time" : { - "type" : "long" + "name": { + "type": "text" }, - "schedule" : { - "properties" : { - "interval" : { - "properties" : { - "period" : { - "type" : "long" + "schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" }, - "start_time" : { - "type" : "long" + "start_time": { + "type": "long" }, - "unit" : { - "type" : "text" + "unit": { + "type": "text" } } } } }, - "state" : { - "type" : "text" + "state": { + "type": "text" }, - "update_enabled" : { - "type" : "boolean" + "system_schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" + }, + "start_time": { + "type": "long" + }, + "unit": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + } + }, + "task": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } }, - "update_stats" : { - "properties" : { - "last_failed_at_in_epoch_millis" : { - "type" : "long" + "update_enabled": { + "type": "boolean" + }, + "update_stats": { + "properties": { + "last_failed_at_in_epoch_millis": { + "type": "long" }, - "last_processing_time_in_millis" : { - "type" : "long" + "last_processing_time_in_millis": { + "type": "long" }, - "last_skipped_at_in_epoch_millis" : { - "type" : "long" + "last_skipped_at_in_epoch_millis": { + "type": "long" }, - "last_succeeded_at_in_epoch_millis" : { - "type" : "long" + "last_succeeded_at_in_epoch_millis": { + "type": "long" + } + } + }, + "user_schedule": { + "properties": { + "interval": { + "properties": { + "period": { + "type": "long" + }, + "start_time": { + "type": "long" + }, + "unit": { + "type": "text", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java index 09e2dd46..1c935b57 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/DatasourceDaoTests.java @@ -43,6 +43,7 @@ import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; @@ -205,6 +206,7 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime GetRequest request = (GetRequest) actionRequest; assertEquals(datasource.getName(), request.id()); assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(QueryPreference.PRIMARY, request.preference()); GetResponse response = getMockedGetResponse(isExist ? datasource : null); if (exception != null) { throw exception; @@ -262,6 +264,7 @@ public void testGetDatasources_whenValidInput_thenSucceed() { assertTrue(actionRequest instanceof MultiGetRequest); MultiGetRequest request = (MultiGetRequest) actionRequest; assertEquals(2, request.getItems().size()); + assertEquals(QueryPreference.PRIMARY, request.preference()); for (MultiGetRequest.Item item : request.getItems()) { assertEquals(DatasourceExtension.JOB_INDEX_NAME, item.index()); assertTrue(datasources.stream().filter(datasource -> datasource.getName().equals(item.id())).findAny().isPresent()); @@ -295,6 +298,7 @@ public void testGetAllDatasources_whenAsynchronous_thenSucceed() { assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); assertEquals(1000, request.source().size()); + assertEquals(QueryPreference.PRIMARY, request.preference()); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); @@ -322,6 +326,7 @@ public void testGetAllDatasources_whenSynchronous_thenSucceed() { assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.indices()[0]); assertEquals(QueryBuilders.matchAllQuery(), request.source().query()); assertEquals(1000, request.source().size()); + assertEquals(QueryPreference.PRIMARY, request.preference()); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java index 45380aa3..7f78de3a 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java @@ -45,6 +45,7 @@ import org.opensearch.common.SuppressForbidden; import org.opensearch.common.bytes.BytesReference; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.constants.QueryPreference; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; import org.opensearch.geospatial.shared.Constants; @@ -236,7 +237,7 @@ public void testGetGeoIpData_whenDataExist_thenReturnTheData() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof SearchRequest; SearchRequest request = (SearchRequest) actionRequest; - assertEquals("_local", request.preference()); + assertEquals(QueryPreference.LOCAL, request.preference()); assertEquals(1, request.source().size()); assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query()); @@ -269,7 +270,7 @@ public void testGetGeoIpData_whenNoData_thenReturnEmpty() { verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assert actionRequest instanceof SearchRequest; SearchRequest request = (SearchRequest) actionRequest; - assertEquals("_local", request.preference()); + assertEquals(QueryPreference.LOCAL, request.preference()); assertEquals(1, request.source().size()); assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query());