From 07d7e7206907d78109fd98b11af871ba688df7d8 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Thu, 22 Jun 2023 13:39:55 -0700 Subject: [PATCH] Add cache layer to reduce GeoIp data retrieval latency Signed-off-by: Heemin Kim --- .../ip2geo/action/GetDatasourceRequest.java | 7 +- .../ip2geo/common/Ip2GeoSettings.java | 15 +- .../geospatial/ip2geo/dao/GeoIpDataDao.java | 90 +------ .../ip2geo/dao/Ip2GeoCachedDao.java | 57 +++-- .../ip2geo/processor/Ip2GeoProcessor.java | 83 ++----- .../geospatial/plugin/GeospatialPlugin.java | 2 +- .../action/DeleteDatasourceRequestTests.java | 14 ++ .../action/GetDatasourceRequestTests.java | 12 + .../GetDatasourceTransportActionTests.java | 13 ++ .../ip2geo/dao/GeoIpDataDaoTests.java | 56 ++--- .../ip2geo/dao/Ip2GeoCachedDaoTests.java | 27 ++- .../jobscheduler/DatasourceRunnerTests.java | 4 + .../processor/Ip2GeoProcessorTests.java | 221 +++++++++--------- 13 files changed, 278 insertions(+), 323 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java index aa32c8f7..889d7899 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java @@ -10,7 +10,6 @@ import lombok.Getter; import lombok.Setter; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.io.stream.StreamInput; @@ -52,10 +51,12 @@ public GetDatasourceRequest(final StreamInput in) throws IOException { @Override public ActionRequestValidationException validate() { + ActionRequestValidationException errors = null; if (names == null) { - throw new OpenSearchException("names should not be null"); + errors = new ActionRequestValidationException(); + errors.addValidationError("names should not be null"); } - return null; + return errors; } @Override diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java index 12d06b50..0c95fc92 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java @@ -52,16 +52,11 @@ public class Ip2GeoSettings { ); /** - * Multi search max concurrent searches - * - * Multi search is used only when a field contains a list of ip addresses. - * - * When the value is 0, it will use default value which will be decided - * based on node count and search thread pool size. + * Max size for geo data cache */ - public static final Setting MAX_CONCURRENT_SEARCHES = Setting.intSetting( - "plugins.geospatial.ip2geo.processor.max_concurrent_searches", - 0, + public static final Setting CACHE_SIZE = Setting.longSetting( + "plugins.geospatial.ip2geo.processor.cache_size", + 1000, 0, Setting.Property.NodeScope, Setting.Property.Dynamic @@ -72,7 +67,7 @@ public class Ip2GeoSettings { * @return a list of all settings for Ip2Geo feature */ public static final List> settings() { - return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, MAX_CONCURRENT_SEARCHES); + return List.of(DATASOURCE_ENDPOINT, DATASOURCE_UPDATE_INTERVAL, TIMEOUT, CACHE_SIZE); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java index 2c0c5d72..a4abdb35 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDao.java @@ -37,7 +37,6 @@ import org.apache.logging.log4j.util.Strings; import org.opensearch.OpenSearchException; import org.opensearch.SpecialPermission; -import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.bulk.BulkRequest; @@ -62,7 +61,6 @@ import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.shared.Constants; import org.opensearch.geospatial.shared.StashedThreadContext; -import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilders; /** @@ -242,94 +240,26 @@ public XContentBuilder createDocument(final String[] fields, final String[] valu * * @param indexName index * @param ip ip address - * @param actionListener action listener + * @return geoIP data */ - public void getGeoIpData(final String indexName, final String ip, final ActionListener> actionListener) { - StashedThreadContext.run( + public Map getGeoIpData(final String indexName, final String ip) { + SearchResponse response = StashedThreadContext.run( client, () -> client.prepareSearch(indexName) .setSize(1) .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) .setPreference("_local") .setRequestCache(true) - .execute(new ActionListener<>() { - @Override - public void onResponse(final SearchResponse searchResponse) { - try { - if (searchResponse.getHits().getHits().length == 0) { - actionListener.onResponse(Collections.emptyMap()); - } else { - Map geoIpData = (Map) XContentHelper.convertToMap( - searchResponse.getHits().getAt(0).getSourceRef(), - false, - XContentType.JSON - ).v2().get(DATA_FIELD_NAME); - actionListener.onResponse(geoIpData); - } - } catch (Exception e) { - actionListener.onFailure(e); - } - } - - @Override - public void onFailure(final Exception e) { - actionListener.onFailure(e); - } - }) + .get(TimeValue.timeValueSeconds(30)) ); - } - /** - * Query a given index using a given list of ip addresses to get geoip data - * - * @param indexName index - * @param ips list of ip addresses - * @param actionListener action listener - */ - public void getGeoIpData( - final String indexName, - final List ips, - final ActionListener>> actionListener - ) { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - ips.stream().forEach(ip -> boolQueryBuilder.should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip))); - StashedThreadContext.run( - client, - () -> client.prepareSearch(indexName) - .setSize(ips.size()) - .setQuery(boolQueryBuilder) - .setPreference("_local") - .setRequestCache(true) - .execute(new ActionListener<>() { - @Override - public void onResponse(final SearchResponse searchResponse) { - try { - actionListener.onResponse(toGeoIpDataList(searchResponse)); - } catch (Exception e) { - actionListener.onFailure(e); - } - } - - @Override - public void onFailure(final Exception e) { - actionListener.onFailure(e); - } - }) - ); - } - - private List> toGeoIpDataList(final SearchResponse searchResponse) { - if (searchResponse.getHits().getHits().length == 0) { - return Collections.emptyList(); + if (response.getHits().getHits().length == 0) { + return Collections.emptyMap(); + } else { + return (Map) XContentHelper.convertToMap(response.getHits().getAt(0).getSourceRef(), false, XContentType.JSON) + .v2() + .get(DATA_FIELD_NAME); } - - return Arrays.stream(searchResponse.getHits().getHits()) - .map( - data -> (Map) XContentHelper.convertToMap(data.getSourceRef(), false, XContentType.JSON) - .v2() - .get(DATA_FIELD_NAME) - ) - .collect(Collectors.toList()); } /** diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java b/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java index 23e98279..b1b514e9 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDao.java @@ -18,6 +18,7 @@ import lombok.Getter; import lombok.extern.log4j.Log4j2; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.cache.Cache; import org.opensearch.common.cache.CacheBuilder; import org.opensearch.common.xcontent.LoggingDeprecationHandler; @@ -26,7 +27,9 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.ShardId; @@ -37,52 +40,70 @@ @Log4j2 public class Ip2GeoCachedDao implements IndexingOperationListener { private final DatasourceDao datasourceDao; - private Map data; + private final GeoIpDataDao geoIpDataDao; + private final GeoDataCache geoDataCache; + private Map metadata; - public Ip2GeoCachedDao(final DatasourceDao datasourceDao) { + public Ip2GeoCachedDao(final ClusterService clusterService, final DatasourceDao datasourceDao, final GeoIpDataDao geoIpDataDao) { this.datasourceDao = datasourceDao; + this.geoIpDataDao = geoIpDataDao; + this.geoDataCache = new GeoDataCache(clusterService.getClusterSettings().get(Ip2GeoSettings.CACHE_SIZE)); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(Ip2GeoSettings.CACHE_SIZE, setting -> this.geoDataCache.updateMaxSize(setting.longValue())); } public String getIndexName(final String datasourceName) { - return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName(); + return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getIndexName(); } public boolean isExpired(final String datasourceName) { - return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now()); + return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getExpirationDate().isBefore(Instant.now()); } public boolean has(final String datasourceName) { - return getData().containsKey(datasourceName); + return getMetadata().containsKey(datasourceName); } public DatasourceState getState(final String datasourceName) { - return getData().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState(); + return getMetadata().getOrDefault(datasourceName, DatasourceMetadata.EMPTY_METADATA).getState(); } - private Map getData() { - if (data != null) { - return data; + public Map getGeoData(final String indexName, final String ip) { + try { + return geoDataCache.putIfAbsent(indexName, ip, addr -> geoIpDataDao.getGeoIpData(indexName, ip)); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private Map getMetadata() { + if (metadata != null) { + return metadata; } synchronized (this) { - if (data != null) { - return data; + if (metadata != null) { + return metadata; } Map tempData = new ConcurrentHashMap<>(); - datasourceDao.getAllDatasources() - .stream() - .forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource))); - data = tempData; - return data; + try { + datasourceDao.getAllDatasources() + .stream() + .forEach(datasource -> tempData.put(datasource.getName(), new DatasourceMetadata(datasource))); + } catch (IndexNotFoundException e) { + log.debug("Datasource has never been created"); + } + metadata = tempData; + return metadata; } } private void put(final Datasource datasource) { DatasourceMetadata metadata = new DatasourceMetadata(datasource); - getData().put(datasource.getName(), metadata); + getMetadata().put(datasource.getName(), metadata); } private void remove(final String datasourceName) { - getData().remove(datasourceName); + getMetadata().remove(datasourceName); } @Override diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 356869fe..56100c0b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -21,9 +21,7 @@ import lombok.Getter; import lombok.extern.log4j.Log4j2; -import org.opensearch.action.ActionListener; import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.geospatial.annotation.VisibleForTesting; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.dao.DatasourceDao; import org.opensearch.geospatial.ip2geo.dao.GeoIpDataDao; @@ -147,8 +145,7 @@ public IngestDocument execute(IngestDocument ingestDocument) { throw new IllegalStateException("Not implemented"); } - @VisibleForTesting - protected void executeInternal( + private void executeInternal( final IngestDocument ingestDocument, final BiConsumer handler, final String ip @@ -160,36 +157,11 @@ protected void executeInternal( return; } - try { - geoIpDataDao.getGeoIpData(indexName, ip, getSingleGeoIpDataListener(ingestDocument, handler)); - } catch (Exception e) { - handler.accept(null, e); + Map geoData = ip2GeoCachedDao.getGeoData(indexName, ip); + if (geoData.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, filteredGeoData(geoData)); } - } - - @VisibleForTesting - protected ActionListener> getSingleGeoIpDataListener( - final IngestDocument ingestDocument, - final BiConsumer handler - ) { - return new ActionListener<>() { - @Override - public void onResponse(final Map ipToGeoData) { - try { - if (ipToGeoData.isEmpty() == false) { - ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData)); - } - handler.accept(ingestDocument, null); - } catch (Exception e) { - handler.accept(null, e); - } - } - - @Override - public void onFailure(final Exception e) { - handler.accept(null, e); - } - }; + handler.accept(ingestDocument, null); } private Map filteredGeoData(final Map geoData) { @@ -200,14 +172,6 @@ private Map filteredGeoData(final Map geoData) { return properties.stream().filter(p -> geoData.containsKey(p)).collect(Collectors.toMap(p -> p, p -> geoData.get(p))); } - private List> filteredGeoData(final List> geoData) { - if (properties == null) { - return geoData; - } - - return geoData.stream().map(this::filteredGeoData).collect(Collectors.toList()); - } - private void validateDatasourceIsInAvailableState(final String datasourceName) { if (ip2GeoCachedDao.has(datasourceName) == false) { throw new IllegalStateException("datasource does not exist"); @@ -230,8 +194,7 @@ private void handleExpiredData(final IngestDocument ingestDocument, final BiCons * @param handler the handler * @param ips the ip list */ - @VisibleForTesting - protected void executeInternal( + private void executeInternal( final IngestDocument ingestDocument, final BiConsumer handler, final List ips @@ -249,32 +212,16 @@ protected void executeInternal( return; } - geoIpDataDao.getGeoIpData(indexName, (List) ips, getMultiGeoIpDataListener(ingestDocument, handler)); - } - - @VisibleForTesting - protected ActionListener>> getMultiGeoIpDataListener( - final IngestDocument ingestDocument, - final BiConsumer handler - ) { - return new ActionListener<>() { - @Override - public void onResponse(final List> ipToGeoData) { - try { - if (ipToGeoData.isEmpty() == false) { - ingestDocument.setFieldValue(targetField, filteredGeoData(ipToGeoData)); - } - handler.accept(ingestDocument, null); - } catch (Exception e) { - handler.accept(null, e); - } - } + List> geoDataList = ips.stream() + .map(ip -> ip2GeoCachedDao.getGeoData(indexName, (String) ip)) + .filter(geoData -> geoData.isEmpty() == false) + .map(this::filteredGeoData) + .collect(Collectors.toList()); - @Override - public void onFailure(final Exception e) { - handler.accept(null, e); - } - }; + if (geoDataList.isEmpty() == false) { + ingestDocument.setFieldValue(targetField, geoDataList); + } + handler.accept(ingestDocument, null); } @Override diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 942ce65c..46a56b60 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -107,7 +107,7 @@ public Collection getSystemIndexDescriptors(Settings sett public Map getProcessors(Processor.Parameters parameters) { this.datasourceDao = new DatasourceDao(parameters.client, parameters.ingestService.getClusterService()); this.geoIpDataDao = new GeoIpDataDao(parameters.ingestService.getClusterService(), parameters.client); - this.ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao); + this.ip2GeoCachedDao = new Ip2GeoCachedDao(parameters.ingestService.getClusterService(), datasourceDao, geoIpDataDao); return MapBuilder.newMapBuilder() .put(FeatureProcessor.TYPE, new FeatureProcessor.Factory()) .put(Ip2GeoProcessor.TYPE, new Ip2GeoProcessor.Factory(parameters.ingestService, datasourceDao, geoIpDataDao, ip2GeoCachedDao)) diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java index 8bd84924..5ca4ac34 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java @@ -7,6 +7,8 @@ import lombok.SneakyThrows; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.Randomness; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.geospatial.GeospatialTestHelper; @@ -27,4 +29,16 @@ public void testStreamInOut_whenValidInput_thenSucceed() { // Verify assertEquals(request.getName(), copiedRequest.getName()); } + + public void testValidate_whenNullOrBlank_thenError() { + String datasourceName = Randomness.get().nextInt() % 2 == 0 ? null : " "; + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasourceName); + + // Run + ActionRequestValidationException error = request.validate(); + + // Verify + assertNotNull(error.validationErrors()); + assertFalse(error.validationErrors().isEmpty()); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java index 09f1b67c..6b61e6e2 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java @@ -5,6 +5,7 @@ package org.opensearch.geospatial.ip2geo.action; +import org.opensearch.action.ActionRequestValidationException; import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.geospatial.GeospatialTestHelper; @@ -40,4 +41,15 @@ public void testStreamInOut_whenNames_thenSucceed() throws Exception { // Verify assertArrayEquals(request.getNames(), copiedRequest.getNames()); } + + public void testValidate_whenNull_thenError() { + GetDatasourceRequest request = new GetDatasourceRequest((String[]) null); + + // Run + ActionRequestValidationException error = request.validate(); + + // Verify + assertNotNull(error.validationErrors()); + assertFalse(error.validationErrors().isEmpty()); + } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java index b7732a31..581fd3b8 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java @@ -15,6 +15,7 @@ import java.util.List; import org.junit.Before; +import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; @@ -56,6 +57,18 @@ public void testDoExecute_whenNames_thenSucceed() { verify(datasourceDao).getDatasources(eq(datasourceNames), any(ActionListener.class)); } + public void testDoExecute_whenNull_thenException() { + Task task = mock(Task.class); + GetDatasourceRequest request = new GetDatasourceRequest((String[]) null); + ActionListener listener = mock(ActionListener.class); + + // Run + Exception exception = expectThrows(OpenSearchException.class, () -> action.doExecute(task, request, listener)); + + // Verify + assertTrue(exception.getMessage().contains("should not be null")); + } + public void testNewActionListener_whenOnResponse_thenSucceed() { List datasources = Arrays.asList(randomDatasource(), randomDatasource()); ActionListener actionListener = mock(ActionListener.class); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java index 13872a27..45380aa3 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/GeoIpDataDaoTests.java @@ -19,9 +19,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; -import java.util.Arrays; import java.util.Iterator; -import java.util.List; import java.util.Locale; import java.util.Map; @@ -32,9 +30,7 @@ import org.apache.commons.csv.CSVRecord; import org.apache.lucene.search.TotalHits; import org.junit.Before; -import org.mockito.ArgumentCaptor; import org.opensearch.OpenSearchException; -import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.forcemerge.ForceMergeRequest; @@ -108,6 +104,18 @@ public void testCreateDocument_whenBlankValue_thenDoNotAdd() { ); } + @SneakyThrows + public void testCreateDocument_whenFieldsAndValuesLengthDoesNotMatch_thenThrowException() { + String[] names = { "ip", "country", "location", "city" }; + String[] values = { "1.0.0.0/25", "USA", " " }; + + // Run + Exception e = expectThrows(OpenSearchException.class, () -> noOpsGeoIpDataDao.createDocument(names, values)); + + // Verify + assertTrue(e.getMessage().contains("does not match")); + } + public void testGetDatabaseReader() throws Exception { File zipFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.zip").getFile()); DatasourceManifest manifest = new DatasourceManifest( @@ -222,7 +230,7 @@ public void testPutGeoIpData_whenValidInput_thenSucceed() { } } - public void testGetGeoIpData_whenSingleIp_thenSucceed() { + public void testGetGeoIpData_whenDataExist_thenReturnTheData() { String indexName = GeospatialTestHelper.randomLowerCaseString(); String ip = randomIpAddress(); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { @@ -247,14 +255,15 @@ public void testGetGeoIpData_whenSingleIp_thenSucceed() { when(response.getHits()).thenReturn(searchHits); return response; }); - ActionListener> listener = mock(ActionListener.class); - verifyingGeoIpDataDao.getGeoIpData(indexName, ip, listener); - ArgumentCaptor> captor = ArgumentCaptor.forClass(Map.class); - verify(listener).onResponse(captor.capture()); - assertEquals("seattle", captor.getValue().get("city")); + + // Run + Map geoData = verifyingGeoIpDataDao.getGeoIpData(indexName, ip); + + // Verify + assertEquals("seattle", geoData.get("city")); } - public void testGetGeoIpData_whenMultiIps_thenSucceed() { + public void testGetGeoIpData_whenNoData_thenReturnEmpty() { String indexName = GeospatialTestHelper.randomLowerCaseString(); String ip = randomIpAddress(); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { @@ -262,27 +271,20 @@ public void testGetGeoIpData_whenMultiIps_thenSucceed() { SearchRequest request = (SearchRequest) actionRequest; assertEquals("_local", request.preference()); assertEquals(1, request.source().size()); - assertEquals(QueryBuilders.boolQuery().should(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)), request.source().query()); + assertEquals(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip), request.source().query()); - String data = String.format( - Locale.ROOT, - "{\"%s\":\"1.0.0.1/16\",\"%s\":{\"city\":\"seattle\"}}", - IP_RANGE_FIELD_NAME, - DATA_FIELD_NAME - ); - SearchHit searchHit = new SearchHit(1); - searchHit.sourceRef(BytesReference.fromByteBuffer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)))); - SearchHit[] searchHitArray = { searchHit }; - SearchHits searchHits = new SearchHits(searchHitArray, new TotalHits(1l, TotalHits.Relation.EQUAL_TO), 1); + SearchHit[] searchHitArray = {}; + SearchHits searchHits = new SearchHits(searchHitArray, new TotalHits(0l, TotalHits.Relation.EQUAL_TO), 0); SearchResponse response = mock(SearchResponse.class); when(response.getHits()).thenReturn(searchHits); return response; }); - ActionListener>> listener = mock(ActionListener.class); - verifyingGeoIpDataDao.getGeoIpData(indexName, Arrays.asList(ip), listener); - ArgumentCaptor>> captor = ArgumentCaptor.forClass(List.class); - verify(listener).onResponse(captor.capture()); - assertEquals("seattle", captor.getValue().get(0).get("city")); + + // Run + Map geoData = verifyingGeoIpDataDao.getGeoIpData(indexName, ip); + + // Verify + assertTrue(geoData.isEmpty()); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java index 4906fba9..c02a9cd1 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/dao/Ip2GeoCachedDaoTests.java @@ -13,6 +13,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import lombok.SneakyThrows; @@ -24,6 +25,7 @@ import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.engine.Engine; import org.opensearch.index.shard.ShardId; @@ -32,7 +34,7 @@ public class Ip2GeoCachedDaoTests extends Ip2GeoTestCase { @Before public void init() { - ip2GeoCachedDao = new Ip2GeoCachedDao(datasourceDao); + ip2GeoCachedDao = new Ip2GeoCachedDao(clusterService, datasourceDao, geoIpDataDao); } public void testGetIndexName_whenCalled_thenReturnIndexName() { @@ -46,6 +48,16 @@ public void testGetIndexName_whenCalled_thenReturnIndexName() { assertEquals(datasource.currentIndexName(), indexName); } + public void testGetIndexName_whenIndexNotFound_thenReturnNull() { + when(datasourceDao.getAllDatasources()).thenThrow(new IndexNotFoundException("not found")); + + // Run + String indexName = ip2GeoCachedDao.getIndexName(GeospatialTestHelper.randomLowerCaseString()); + + // Verify + assertNull(indexName); + } + public void testIsExpired_whenExpired_thenReturnTrue() { Datasource datasource = randomDatasource(); datasource.getUpdateStats().setLastSucceededAt(Instant.MIN); @@ -106,6 +118,19 @@ public void testGetState_whenCalled_thenReturnState() { assertEquals(datasource.getState(), state); } + public void testGetGeoData_whenCalled_thenReturnGeoData() { + Datasource datasource = randomDatasource(); + String ip = NetworkAddress.format(randomIp(false)); + Map expectedGeoData = Map.of("city", "Seattle"); + when(geoIpDataDao.getGeoIpData(datasource.currentIndexName(), ip)).thenReturn(expectedGeoData); + + // Run + Map geoData = ip2GeoCachedDao.getGeoData(datasource.currentIndexName(), ip); + + // Verify + assertEquals(expectedGeoData, geoData); + } + @SneakyThrows public void testPostIndex_whenFailed_thenNoUpdate() { when(datasourceDao.getAllDatasources()).thenReturn(Arrays.asList()); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java index 1800cdb6..d4c460c4 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunnerTests.java @@ -41,6 +41,10 @@ public void init() { .initialize(clusterService, datasourceUpdateService, ip2GeoExecutor, datasourceDao, ip2GeoLockService); } + public void testGetJobRunnerInstance_whenCalledAgain_thenReturnSameInstance() { + assertTrue(DatasourceRunner.getJobRunnerInstance() == DatasourceRunner.getJobRunnerInstance()); + } + public void testRunJob_whenInvalidClass_thenThrowException() { JobDocVersion jobDocVersion = new JobDocVersion(randomInt(), randomInt(), randomInt()); String jobIndexName = randomLowerCaseString(); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java index 700fe56b..82233c66 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorTests.java @@ -6,10 +6,10 @@ package org.opensearch.geospatial.ip2geo.processor; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; -import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -24,7 +24,6 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; -import org.opensearch.action.ActionListener; import org.opensearch.common.Randomness; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -34,8 +33,6 @@ public class Ip2GeoProcessorTests extends Ip2GeoTestCase { private static final String DEFAULT_TARGET_FIELD = "ip2geo"; - private static final String CONFIG_DATASOURCE_KEY = "datasource"; - private static final String CONFIG_FIELD_KEY = "field"; private static final List SUPPORTED_FIELDS = Arrays.asList("city", "country"); private Ip2GeoProcessor.Factory factory; @@ -112,134 +109,136 @@ public void testExecute_whenNoDatasource_thenNotExistError() { public void testExecute_whenExpired_thenExpiredMsg() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); + BiConsumer handler = mock(BiConsumer.class); - Map source = new HashMap<>(); - String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); - - String index = GeospatialTestHelper.randomLowerCaseString(); - when(ip2GeoCachedDao.getIndexName(datasourceName)).thenReturn(index); + String indexName = GeospatialTestHelper.randomLowerCaseString(); + when(ip2GeoCachedDao.getIndexName(datasourceName)).thenReturn(indexName); when(ip2GeoCachedDao.has(datasourceName)).thenReturn(true); - when(ip2GeoCachedDao.isExpired(datasourceName)).thenReturn(true); when(ip2GeoCachedDao.getState(datasourceName)).thenReturn(DatasourceState.AVAILABLE); + when(ip2GeoCachedDao.isExpired(datasourceName)).thenReturn(true); + Map geoData = Map.of("city", "Seattle", "country", "USA"); + when(ip2GeoCachedDao.getGeoData(eq(indexName), any())).thenReturn(geoData); - BiConsumer handler = mock(BiConsumer.class); + // Run for single ip + String ip = randomIpAddress(); + IngestDocument documentWithIp = createDocument(ip); + processor.execute(documentWithIp, handler); - // Run - processor.execute(document, handler); + // Verify + verify(handler).accept(documentWithIp, null); + assertEquals("ip2geo_data_expired", documentWithIp.getFieldValue(DEFAULT_TARGET_FIELD + ".error", String.class)); + + // Run for multi ips + List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + IngestDocument documentWithIps = createDocument(ips); + processor.execute(documentWithIps, handler); // Verify - verify(handler).accept(document, null); - assertEquals("ip2geo_data_expired", document.getFieldValue(DEFAULT_TARGET_FIELD + ".error", String.class)); + verify(handler).accept(documentWithIps, null); + assertEquals("ip2geo_data_expired", documentWithIp.getFieldValue(DEFAULT_TARGET_FIELD + ".error", String.class)); } @SneakyThrows - public void testExecuteInternal_whenSingleIp_thenGetDatasourceIsCalled() { + public void testExecute_whenNotAvailable_thenException() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - Map source = new HashMap<>(); - String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); BiConsumer handler = mock(BiConsumer.class); String indexName = GeospatialTestHelper.randomLowerCaseString(); when(ip2GeoCachedDao.getIndexName(datasourceName)).thenReturn(indexName); when(ip2GeoCachedDao.has(datasourceName)).thenReturn(true); - when(ip2GeoCachedDao.getState(datasourceName)).thenReturn(DatasourceState.AVAILABLE); + when(ip2GeoCachedDao.getState(datasourceName)).thenReturn(DatasourceState.CREATE_FAILED); when(ip2GeoCachedDao.isExpired(datasourceName)).thenReturn(false); + Map geoData = Map.of("city", "Seattle", "country", "USA"); + when(ip2GeoCachedDao.getGeoData(eq(indexName), any())).thenReturn(geoData); - // Run - processor.executeInternal(document, handler, ip); + // Run for single ip + String ip = randomIpAddress(); + IngestDocument documentWithIp = createDocument(ip); + processor.execute(documentWithIp, handler); + + // Run for multi ips + List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + IngestDocument documentWithIps = createDocument(ips); + processor.execute(documentWithIps, handler); // Verify - verify(geoIpDataDao).getGeoIpData(anyString(), anyString(), any(ActionListener.class)); + ArgumentCaptor captor = ArgumentCaptor.forClass(IllegalStateException.class); + verify(handler, times(2)).accept(isNull(), captor.capture()); + assertTrue(captor.getAllValues().stream().allMatch(e -> e.getMessage().contains("not in an available state"))); } @SneakyThrows - public void testGetSingleGeoIpDataListener_whenNoPropertySet_thenAddAllProperties() { + public void testExecute_whenCalled_thenGeoIpDataIsAdded() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - Map source = new HashMap<>(); - String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); BiConsumer handler = mock(BiConsumer.class); - Map geoIpData = Map.of("city", "Seattle", "country", "USA"); - // Run - processor.getSingleGeoIpDataListener(document, handler).onResponse(geoIpData); - - // Verify - assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".city", String.class)); - assertEquals("USA", document.getFieldValue(DEFAULT_TARGET_FIELD + ".country", String.class)); - verify(handler).accept(document, null); - } + String indexName = GeospatialTestHelper.randomLowerCaseString(); + when(ip2GeoCachedDao.getIndexName(datasourceName)).thenReturn(indexName); + when(ip2GeoCachedDao.has(datasourceName)).thenReturn(true); + when(ip2GeoCachedDao.getState(datasourceName)).thenReturn(DatasourceState.AVAILABLE); + when(ip2GeoCachedDao.isExpired(datasourceName)).thenReturn(false); + Map geoData = Map.of("city", "Seattle", "country", "USA"); + when(ip2GeoCachedDao.getGeoData(eq(indexName), any())).thenReturn(geoData); - @SneakyThrows - public void testGetSingleGeoIpDataListener_whenPropertySet_thenAddOnlyTheProperties() { - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("properties", Arrays.asList("city"))); - Map source = new HashMap<>(); + // Run for single ip String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); - BiConsumer handler = mock(BiConsumer.class); + IngestDocument documentWithIp = createDocument(ip); + processor.execute(documentWithIp, handler); - Map geoIpData = Map.of("city", "Seattle", "country", "USA"); - // Run - processor.getSingleGeoIpDataListener(document, handler).onResponse(geoIpData); + // Verify + assertEquals(geoData.get("city"), documentWithIp.getFieldValue("ip2geo.city", String.class)); + assertEquals(geoData.get("country"), documentWithIp.getFieldValue("ip2geo.country", String.class)); + + // Run for multi ips + List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + IngestDocument documentWithIps = createDocument(ips); + processor.execute(documentWithIps, handler); // Verify - assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".city", String.class)); - assertFalse(document.hasField(DEFAULT_TARGET_FIELD + ".country")); - verify(handler).accept(document, null); + assertEquals(2, documentWithIps.getFieldValue("ip2geo", List.class).size()); + Map addedValue = (Map) documentWithIps.getFieldValue("ip2geo", List.class).get(0); + assertEquals(geoData.get("city"), addedValue.get("city")); + assertEquals(geoData.get("country"), addedValue.get("country")); } @SneakyThrows - public void testGetMultiGeoIpDataListener_whenNoPropertySet_thenAddAllProperties() { + public void testExecute_whenPropertiesSet_thenFilteredGeoIpDataIsAdded() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - Map source = new HashMap<>(); - String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); + Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of(Ip2GeoProcessor.CONFIG_PROPERTIES, Arrays.asList("country"))); BiConsumer handler = mock(BiConsumer.class); - Map geoIpData = Map.of("city", "Seattle", "country", "USA"); - // Run - processor.getMultiGeoIpDataListener(document, handler).onResponse(Arrays.asList(geoIpData)); - - // Verify - assertEquals(1, document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).size()); - assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".0.city", String.class)); - assertEquals("USA", document.getFieldValue(DEFAULT_TARGET_FIELD + ".0.country", String.class)); - verify(handler).accept(document, null); - } + String indexName = GeospatialTestHelper.randomLowerCaseString(); + when(ip2GeoCachedDao.getIndexName(datasourceName)).thenReturn(indexName); + when(ip2GeoCachedDao.has(datasourceName)).thenReturn(true); + when(ip2GeoCachedDao.getState(datasourceName)).thenReturn(DatasourceState.AVAILABLE); + when(ip2GeoCachedDao.isExpired(datasourceName)).thenReturn(false); + Map geoData = Map.of("city", "Seattle", "country", "USA"); + when(ip2GeoCachedDao.getGeoData(eq(indexName), any())).thenReturn(geoData); - @SneakyThrows - public void testGetMultiGeoIpDataListener_whenPropertySet_thenAddOnlyTheProperties() { - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Map.of("properties", Arrays.asList("city"))); - Map source = new HashMap<>(); + // Run for single ip String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); - BiConsumer handler = mock(BiConsumer.class); + IngestDocument documentWithIp = createDocument(ip); + processor.execute(documentWithIp, handler); - Map geoIpData = Map.of("city", "Seattle", "country", "USA"); - // Run - processor.getMultiGeoIpDataListener(document, handler).onResponse(Arrays.asList(geoIpData)); + // Verify + assertFalse(documentWithIp.hasField("ip2geo.city")); + assertEquals(geoData.get("country"), documentWithIp.getFieldValue("ip2geo.country", String.class)); + + // Run for multi ips + List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); + IngestDocument documentWithIps = createDocument(ips); + processor.execute(documentWithIps, handler); // Verify - assertEquals(1, document.getFieldValue(DEFAULT_TARGET_FIELD, List.class).size()); - assertEquals("Seattle", document.getFieldValue(DEFAULT_TARGET_FIELD + ".0.city", String.class)); - assertFalse(document.hasField(DEFAULT_TARGET_FIELD + ".0.country")); - verify(handler).accept(document, null); + assertEquals(2, documentWithIps.getFieldValue("ip2geo", List.class).size()); + Map addedValue = (Map) documentWithIps.getFieldValue("ip2geo", List.class).get(0); + assertFalse(addedValue.containsKey("city")); + assertEquals(geoData.get("country"), addedValue.get("country")); } - public void testExecuteNotImplemented() throws Exception { + public void testExecute_whenNoHandler_thenException() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); IngestDocument document = new IngestDocument(Collections.emptyMap(), Collections.emptyMap()); @@ -247,42 +246,22 @@ public void testExecuteNotImplemented() throws Exception { assertTrue(e.getMessage().contains("Not implemented")); } - public void testExecuteInternalNonStringIp() throws Exception { + public void testExecute_whenContainsNonString_thenException() throws Exception { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); List ips = Arrays.asList(randomIpAddress(), 1); Map source = new HashMap<>(); - String ip = randomIpAddress(); - source.put("ip", ip); - IngestDocument document = new IngestDocument(source, new HashMap<>()); - - BiConsumer handler = mock(BiConsumer.class); - Exception e = expectThrows(IllegalArgumentException.class, () -> processor.executeInternal(document, handler, ips)); - assertTrue(e.getMessage().contains("should only contain strings")); - } - - @SneakyThrows - public void testExecuteInternal_whenMultiIps_thenGetDatasourceIsCalled() { - String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Ip2GeoProcessor processor = createProcessor(datasourceName, Collections.emptyMap()); - Map source = new HashMap<>(); - String ip = randomIpAddress(); - source.put("ip", ip); + source.put("ip", ips); IngestDocument document = new IngestDocument(source, new HashMap<>()); BiConsumer handler = mock(BiConsumer.class); - List ips = Arrays.asList(randomIpAddress(), randomIpAddress()); - - String indexName = GeospatialTestHelper.randomLowerCaseString(); - when(ip2GeoCachedDao.getIndexName(datasourceName)).thenReturn(indexName); - when(ip2GeoCachedDao.has(datasourceName)).thenReturn(true); - when(ip2GeoCachedDao.getState(datasourceName)).thenReturn(DatasourceState.AVAILABLE); - when(ip2GeoCachedDao.isExpired(datasourceName)).thenReturn(false); // Run - processor.executeInternal(document, handler, ips); + processor.execute(document, handler); // Verify - verify(geoIpDataDao).getGeoIpData(anyString(), anyList(), any(ActionListener.class)); + ArgumentCaptor captor = ArgumentCaptor.forClass(IllegalArgumentException.class); + verify(handler).accept(isNull(), captor.capture()); + assertTrue(captor.getValue().getMessage().contains("should only contain strings")); } private Ip2GeoProcessor createProcessor(final String datasourceName, final Map config) throws Exception { @@ -296,8 +275,8 @@ private Ip2GeoProcessor createProcessor(final String datasourceName, final Map config) throws Exception { when(datasourceDao.getDatasource(datasource.getName())).thenReturn(datasource); Map baseConfig = new HashMap<>(); - baseConfig.put(CONFIG_FIELD_KEY, "ip"); - baseConfig.put(CONFIG_DATASOURCE_KEY, datasource.getName()); + baseConfig.put(Ip2GeoProcessor.CONFIG_FIELD, "ip"); + baseConfig.put(Ip2GeoProcessor.CONFIG_DATASOURCE, datasource.getName()); baseConfig.putAll(config); return factory.create( @@ -307,4 +286,16 @@ private Ip2GeoProcessor createProcessor(final Datasource datasource, final Map source = new HashMap<>(); + source.put("ip", ip); + return new IngestDocument(source, new HashMap<>()); + } + + private IngestDocument createDocument(List ips) { + Map source = new HashMap<>(); + source.put("ip", ips); + return new IngestDocument(source, new HashMap<>()); + } }