From ca52c84b9954ce048034352c8c3fda92c92b1221 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Fri, 12 May 2023 14:12:15 -0700 Subject: [PATCH] Add integration test for Ip2GeoProcessor Signed-off-by: Heemin Kim --- .../exceptions/ResourceInUseException.java | 35 ++++ .../DeleteDatasourceTransportAction.java | 5 +- .../ip2geo/action/PutDatasourceRequest.java | 4 +- .../geospatial/GeospatialRestTestCase.java | 87 ++++++++- .../geospatial/ip2geo/Ip2GeoDataServer.java | 127 +++++++++++++ .../geospatial/ip2geo/Ip2GeoTestCase.java | 4 +- .../UpdateDatasourceTransportActionTests.java | 4 +- .../ip2geo/processor/Ip2GeoProcessorIT.java | 169 +++++++++++++++++- .../processor/FeatureProcessorIT.java | 2 +- .../resources/ip2geo/server/city/city.zip | Bin 0 -> 294 bytes .../ip2geo/server/city/manifest.json | 8 + .../ip2geo/server/city/manifest_local.json | 8 + .../ip2geo/server/country/country.zip | Bin 0 -> 263 bytes .../ip2geo/server/country/manifest.json | 8 + .../ip2geo/server/country/manifest_local.json | 8 + 15 files changed, 456 insertions(+), 13 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/exceptions/ResourceInUseException.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoDataServer.java create mode 100644 src/test/resources/ip2geo/server/city/city.zip create mode 100644 src/test/resources/ip2geo/server/city/manifest.json create mode 100644 src/test/resources/ip2geo/server/city/manifest_local.json create mode 100644 src/test/resources/ip2geo/server/country/country.zip create mode 100644 src/test/resources/ip2geo/server/country/manifest.json create mode 100644 src/test/resources/ip2geo/server/country/manifest_local.json diff --git a/src/main/java/org/opensearch/geospatial/exceptions/ResourceInUseException.java b/src/main/java/org/opensearch/geospatial/exceptions/ResourceInUseException.java new file mode 100644 index 00000000..c606abea --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/exceptions/ResourceInUseException.java @@ -0,0 +1,35 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.exceptions; + +import java.io.IOException; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.rest.RestStatus; + +/** + * Generic ResourceInUseException corresponding to the {@link RestStatus#BAD_REQUEST} status code + */ +public class ResourceInUseException extends OpenSearchException { + + public ResourceInUseException(String msg, Object... args) { + super(msg, args); + } + + public ResourceInUseException(String msg, Throwable cause, Object... args) { + super(msg, cause, args); + } + + public ResourceInUseException(StreamInput in) throws IOException { + super(in); + } + + @Override + public final RestStatus status() { + return RestStatus.BAD_REQUEST; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index 27055d41..753c107d 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -17,6 +17,7 @@ import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.annotation.VisibleForTesting; +import org.opensearch.geospatial.exceptions.ResourceInUseException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; @@ -105,7 +106,7 @@ protected void deleteDatasource(final String datasourceName) throws IOException private void setDatasourceStateAsDeleting(final Datasource datasource) { if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) { - throw new OpenSearchException("datasource is being used by one of processors"); + throw new ResourceInUseException("datasource is being used by one of processors"); } DatasourceState previousState = datasource.getState(); @@ -119,7 +120,7 @@ private void setDatasourceStateAsDeleting(final Datasource datasource) { if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) { datasource.setState(previousState); datasourceFacade.updateDatasource(datasource); - throw new OpenSearchException("datasource is being used by one of processors"); + throw new ResourceInUseException("datasource is being used by one of processors"); } } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java index 3347a243..3426008b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java @@ -36,9 +36,9 @@ @Log4j2 @EqualsAndHashCode public class PutDatasourceRequest extends AcknowledgedRequest { - private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); - private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); private static final int MAX_DATASOURCE_NAME_BYTES = 255; + public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); /** * @param name the datasource name * @return the datasource name diff --git a/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java b/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java index 89685c9f..f9449a0e 100644 --- a/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java +++ b/src/test/java/org/opensearch/geospatial/GeospatialRestTestCase.java @@ -17,9 +17,12 @@ import static org.opensearch.search.aggregations.Aggregations.AGGREGATIONS_FIELD; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.stream.IntStream; @@ -46,12 +49,12 @@ import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONRequestContent; import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper; import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction; import org.opensearch.ingest.Pipeline; import org.opensearch.rest.RestStatus; public abstract class GeospatialRestTestCase extends OpenSearchSecureRestTestCase { - public static final String SOURCE = "_source"; public static final String DOC = "_doc"; public static final String URL_DELIMITER = "/"; @@ -69,11 +72,23 @@ public abstract class GeospatialRestTestCase extends OpenSearchSecureRestTestCas public static final String SHAPE_ID_FIELD = "id"; public static final String SHAPE_INDEX_PATH_FIELD = "path"; public static final String QUERY_PARAM_TOKEN = "?"; + private static final String SETTINGS = "_settings"; + private static final String SIMULATE = "_simulate"; + private static final String DOCS = "docs"; + private static final String DATASOURCES = "datasources"; + private static final String STATE = "state"; + private static final String PUT = "PUT"; + private static final String GET = "GET"; + private static final String DELETE = "DELETE"; private static String buildPipelinePath(String name) { return String.join(URL_DELIMITER, "_ingest", "pipeline", name); } + private static String buildDatasourcePath(String name) { + return String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name); + } + protected static Response createPipeline(String name, Optional description, List> processorConfigs) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); @@ -95,6 +110,74 @@ protected static void deletePipeline(String name) throws IOException { client().performRequest(request); } + protected Response createDatasource(final String name, Map properties) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + for (Map.Entry config : properties.entrySet()) { + builder.field(config.getKey(), config.getValue()); + } + builder.endObject(); + + Request request = new Request(PUT, buildDatasourcePath(name)); + request.setJsonEntity(Strings.toString(builder)); + return client().performRequest(request); + } + + protected void waitForDatasourceToBeAvailable(final String name, final Duration timeout) throws Exception { + Instant start = Instant.now(); + while (DatasourceState.AVAILABLE.equals(getDatasourceState(name)) == false) { + if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) { + throw new RuntimeException( + String.format( + Locale.ROOT, + "Datasource state didn't change to %s after %d seconds", + DatasourceState.AVAILABLE.name(), + timeout.toSeconds() + ) + ); + } + Thread.sleep(1000); + } + } + + private DatasourceState getDatasourceState(final String name) throws Exception { + List> datasources = (List>) getDatasource(name).get(DATASOURCES); + return DatasourceState.valueOf((String) datasources.get(0).get(STATE)); + } + + protected Response deleteDatasource(final String name) throws IOException { + Request request = new Request(DELETE, buildDatasourcePath(name)); + return client().performRequest(request); + } + + protected Map getDatasource(final String name) throws Exception { + Request request = new Request(GET, buildDatasourcePath(name)); + Response response = client().performRequest(request); + return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map(); + } + + protected Response updateDatasource(final String name, Map config) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + if (config != null && !config.isEmpty()) { + builder.value(config); + } + builder.endObject(); + + Request request = new Request(PUT, String.join(URL_DELIMITER, buildDatasourcePath(name), SETTINGS)); + request.setJsonEntity(Strings.toString(builder)); + return client().performRequest(request); + } + + protected Map simulatePipeline(final String name, List docs) throws Exception { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + builder.field(DOCS, docs); + builder.endObject(); + + Request request = new Request(GET, String.join(URL_DELIMITER, buildPipelinePath(name), SIMULATE)); + request.setJsonEntity(Strings.toString(builder)); + Response response = client().performRequest(request); + return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map(); + } + protected static void createIndex(String name, Settings settings, Map fieldMap) throws IOException { XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject(MAPPING_PROPERTIES_KEY); for (Map.Entry entry : fieldMap.entrySet()) { @@ -136,7 +219,7 @@ public static String indexDocument(String indexName, String docID, String body, return docID; } - protected Map buildProcessorConfig(final String processorType, final Map properties) { + protected Map buildProcessorConfig(final String processorType, final Map properties) { Map featureProcessor = new HashMap<>(); featureProcessor.put(processorType, properties); return featureProcessor; diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoDataServer.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoDataServer.java new file mode 100644 index 00000000..ba1e4909 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoDataServer.java @@ -0,0 +1,127 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Paths; + +import lombok.SneakyThrows; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.common.SuppressForbidden; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; + +/** + * Simple http server to serve static files under test/java/resources/ip2geo/server for integration testing + */ +@Log4j2 +@SuppressForbidden(reason = "used only for testing") +public class Ip2GeoDataServer { + private static final String SYS_PROPERTY_KEY_CLUSTER_ENDPOINT = "tests.rest.cluster"; + private static final String LOCAL_CLUSTER_ENDPOINT = "127.0.0.1"; + private static final String ROOT = "ip2geo/server"; + private static final int PORT = 8001; + private static final String EXTERNAL_ENDPOINT_PREFIX = + "https://github.com/opensearch-project/geospatial/blob/main/src/test/resources/ip2geo/server"; + + private static HttpServer server; + private static volatile int counter = 0; + private static String endpointPrefix = "http://localhost:" + PORT; + private static String cityFilePath = endpointPrefix + "/city/manifest_local.json"; + private static String countryFilePath = endpointPrefix + "/country/manifest_local.json"; + + /** + * Return an endpoint to a manifest file for a sample city data + * The sample data should contain three lines as follows + * + * cidr,city,country + * 10.0.0.0/8,Seattle,USA + * 127.0.0.0/12,Vancouver,Canada + * fd12:2345:6789:1::/64,Bengaluru,India + * + */ + public static String getEndpointCity() { + return cityFilePath; + } + + /** + * Return an endpoint to a manifest file for a sample country data + * The sample data should contain three lines as follows + * + * cidr,country + * 10.0.0.0/8,USA + * 127.0.0.0/12,Canada + * fd12:2345:6789:1::/64,India + * + */ + public static String getEndpointCountry() { + return countryFilePath; + } + + @SneakyThrows + synchronized public static void start() { + log.info("Start server is called"); + // If it is remote cluster test, use external endpoint and do not launch local server + if (System.getProperty(SYS_PROPERTY_KEY_CLUSTER_ENDPOINT).contains(LOCAL_CLUSTER_ENDPOINT) == false) { + log.info("Remote cluster[{}] testing. Skip launching local server", System.getProperty(SYS_PROPERTY_KEY_CLUSTER_ENDPOINT)); + cityFilePath = EXTERNAL_ENDPOINT_PREFIX + "/city/manifest.json"; + countryFilePath = EXTERNAL_ENDPOINT_PREFIX + "/country/manifest.json"; + return; + } + + counter++; + if (server != null) { + log.info("Server has started already"); + return; + } + server = HttpServer.create(new InetSocketAddress("localhost", PORT), 0); + server.createContext("/", new Ip2GeoHttpHandler()); + server.start(); + log.info("Local file server started on port {}", PORT); + } + + synchronized public static void stop() { + log.info("Stop server is called"); + if (server == null) { + log.info("Server has stopped already"); + return; + } + counter--; + if (counter > 0) { + log.info("[{}] processors are still using the server", counter); + return; + } + + server.stop(0); + server = null; + log.info("Server stopped"); + } + + @SuppressForbidden(reason = "used only for testing") + private static class Ip2GeoHttpHandler implements HttpHandler { + @Override + public void handle(final HttpExchange exchange) throws IOException { + try { + byte[] data = Files.readAllBytes( + Paths.get(this.getClass().getClassLoader().getResource(ROOT + exchange.getRequestURI().getPath()).toURI()) + ); + exchange.sendResponseHeaders(200, data.length); + OutputStream outputStream = exchange.getResponseBody(); + outputStream.write(data); + outputStream.flush(); + outputStream.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index 707c43a9..4e76d6d4 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -179,11 +179,11 @@ protected long randomPositiveLong() { } protected Datasource randomDatasource() { - int validForInDays = Randomness.get().nextInt(30) + 2; + int validForInDays = Randomness.get().nextInt(30); Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); Datasource datasource = new Datasource(); datasource.setName(GeospatialTestHelper.randomLowerCaseString()); - datasource.setSchedule(new IntervalSchedule(now, validForInDays - 1, ChronoUnit.DAYS)); + datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS)); datasource.setState(randomState()); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString())); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java index 5364a147..2d3a5602 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -25,6 +25,7 @@ import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.Randomness; import org.opensearch.common.unit.TimeValue; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; @@ -83,8 +84,9 @@ private void validateDoExecuteWithLockError(final Exception exception) { public void testDoExecute_whenValidInput_thenUpdate() { Datasource datasource = randomDatasource(); UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); - request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval() + 1)); request.setEndpoint(sampleManifestUrl()); + // Sample manifest has validForDays of 30. Update interval should be less than that. + request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29))); Task task = mock(Task.class); when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java index 34430d06..9c59039c 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessorIT.java @@ -6,20 +6,31 @@ package org.opensearch.geospatial.ip2geo.processor; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import lombok.SneakyThrows; import org.opensearch.client.Response; import org.opensearch.client.ResponseException; +import org.opensearch.common.Randomness; import org.opensearch.geospatial.GeospatialRestTestCase; import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoDataServer; +import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest; public class Ip2GeoProcessorIT extends GeospatialRestTestCase { + private static final String CITY = "city"; + private static final String COUNTRY = "country"; + private static final String IP = "ip"; + private static final String SOURCE = "_source"; @SneakyThrows public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() { @@ -33,22 +44,174 @@ public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() { // Verify assertTrue(exception.getMessage().contains("doesn't exist")); + assertEquals(400, exception.getResponse().getStatusLine().getStatusCode()); } - private Response createIp2GeoProcessorPipeline(final String pipelineName, final Map properties) throws IOException { + @SneakyThrows + public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() { + Ip2GeoDataServer.start(); + try { + String pipelineName = GeospatialTestHelper.randomLowerCaseString(); + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String targetField = GeospatialTestHelper.randomLowerCaseString(); + String field = GeospatialTestHelper.randomLowerCaseString(); + + Map datasourceProperties = Map.of( + PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(), + Ip2GeoDataServer.getEndpointCity() + ); + + // Create datasource and wait for it to be available + createDatasource(datasourceName, datasourceProperties); + waitForDatasourceToBeAvailable(datasourceName, Duration.ofSeconds(10)); + + Map processorProperties = Map.of( + Ip2GeoProcessor.CONFIG_FIELD, + field, + Ip2GeoProcessor.CONFIG_DATASOURCE, + datasourceName, + Ip2GeoProcessor.CONFIG_TARGET_FIELD, + targetField, + Ip2GeoProcessor.CONFIG_PROPERTIES, + Arrays.asList(IP, CITY) + ); + + // Create ip2geo processor + createIp2GeoProcessorPipeline(pipelineName, processorProperties); + + Map> sampleData = getSampleData(); + List docs = sampleData.entrySet() + .stream() + .map(entry -> createDocument(field, entry.getKey())) + .collect(Collectors.toList()); + + // Simulate processor + Map response = simulatePipeline(pipelineName, docs); + + // Verify data added to document + List> sources = convertToListOfSources(response, targetField); + sources.stream().forEach(source -> { + assertFalse(source.containsKey(COUNTRY)); + assertEquals(sampleData.get(source.get(IP)).get(CITY), source.get(CITY)); + }); + + // Delete datasource fails when there is a process using it + ResponseException exception = expectThrows(ResponseException.class, () -> deleteDatasource(datasourceName)); + // Verify + assertEquals(400, exception.getResponse().getStatusLine().getStatusCode()); + + // Delete resources + deletePipeline(pipelineName); + deleteDatasource(datasourceName); + } finally { + Ip2GeoDataServer.stop(); + } + } + + private Response createIp2GeoProcessorPipeline(final String pipelineName, final Map properties) throws IOException { String field = GeospatialTestHelper.randomLowerCaseString(); String datasourceName = GeospatialTestHelper.randomLowerCaseString(); - Map defaultProperties = Map.of( + Map defaultProperties = Map.of( Ip2GeoProcessor.CONFIG_FIELD, field, Ip2GeoProcessor.CONFIG_DATASOURCE, datasourceName ); - Map baseProperties = new HashMap<>(); + Map baseProperties = new HashMap<>(); baseProperties.putAll(defaultProperties); baseProperties.putAll(properties); Map processorConfig = buildProcessorConfig(Ip2GeoProcessor.TYPE, baseProperties); return createPipeline(pipelineName, Optional.empty(), Arrays.asList(processorConfig)); } + + private Map> getSampleData() { + Map> sampleData = new HashMap<>(); + sampleData.put( + String.format( + Locale.ROOT, + "10.%d.%d.%d", + Randomness.get().nextInt(255), + Randomness.get().nextInt(255), + Randomness.get().nextInt(255) + ), + Map.of(CITY, "Seattle", COUNTRY, "USA") + ); + sampleData.put( + String.format( + Locale.ROOT, + "127.%d.%d.%d", + Randomness.get().nextInt(15), + Randomness.get().nextInt(255), + Randomness.get().nextInt(255) + ), + Map.of(CITY, "Vancouver", COUNTRY, "Canada") + ); + sampleData.put( + String.format( + Locale.ROOT, + "fd12:2345:6789:1:%x:%x:%x:%x", + Randomness.get().nextInt(65535), + Randomness.get().nextInt(65535), + Randomness.get().nextInt(65535), + Randomness.get().nextInt(65535) + ), + Map.of(CITY, "Bengaluru", COUNTRY, "India") + ); + return sampleData; + } + + private Map> createDocument(String... args) { + if (args.length % 2 == 1) { + throw new RuntimeException("Number of arguments should be even"); + } + Map source = new HashMap<>(); + for (int i = 0; i < args.length; i += 2) { + source.put(args[0], args[1]); + } + return Map.of(SOURCE, source); + } + + /** + * This method convert returned value of simulatePipeline method to a list of sources + * + * For example, + * Input: + * { + * "docs" : [ + * { + * "doc" : { + * "_index" : "_index", + * "_id" : "_id", + * "_source" : { + * "ip2geo" : { + * "ip" : "127.0.0.1", + * "city" : "Seattle" + * }, + * "_ip" : "127.0.0.1" + * }, + * "_ingest" : { + * "timestamp" : "2023-05-12T17:41:42.939703Z" + * } + * } + * } + * ] + * } + * + * Output: + * [ + * { + * "ip" : "127.0.0.1", + * "city" : "Seattle" + * } + * ] + * + */ + private List> convertToListOfSources(final Map data, final String targetField) { + List>> docs = (List>>) data.get("docs"); + return docs.stream() + .map(doc -> (Map>) doc.get("doc").get(SOURCE)) + .map(source -> source.get(targetField)) + .collect(Collectors.toList()); + } } diff --git a/src/test/java/org/opensearch/geospatial/processor/FeatureProcessorIT.java b/src/test/java/org/opensearch/geospatial/processor/FeatureProcessorIT.java index f2677cf3..94e2b3af 100644 --- a/src/test/java/org/opensearch/geospatial/processor/FeatureProcessorIT.java +++ b/src/test/java/org/opensearch/geospatial/processor/FeatureProcessorIT.java @@ -47,7 +47,7 @@ public void testIndexGeoJSONSuccess() throws Exception { Map geoFields = new HashMap<>(); geoFields.put(geoShapeField, "geo_shape"); - Map processorProperties = new HashMap<>(); + Map processorProperties = new HashMap<>(); processorProperties.put(FeatureProcessor.FIELD_KEY, geoShapeField); Map geoJSONProcessorConfig = buildProcessorConfig(FeatureProcessor.TYPE, processorProperties); List> configs = new ArrayList<>(); diff --git a/src/test/resources/ip2geo/server/city/city.zip b/src/test/resources/ip2geo/server/city/city.zip new file mode 100644 index 0000000000000000000000000000000000000000..12fbd7198d101bd245d5e531ccb8da49a0d423bf GIT binary patch literal 294 zcmWIWW@Zs#-~htL%r#*QP>>6xIT#ceQW8rN^^%LrLPK~N*eB18OPK=1AiA`In}Lz# z2O|RmP&WgE?x}-ZOo{>x4|kt8N=aU3w*A$W%9XztY~wKEUeuT$uk-P~-M@&S=7?V{ zJ9uB_YT3wrD``+Ies6lXc&C=hzxNprj()mrzw_YSR^ip$AI?Mn{5;tud;=MasNeUt*U59Gc8Z$>5&W`x_3U2 KP;l%5aTov(y;=YO literal 0 HcmV?d00001 diff --git a/src/test/resources/ip2geo/server/city/manifest.json b/src/test/resources/ip2geo/server/city/manifest.json new file mode 100644 index 00000000..de1e3f3b --- /dev/null +++ b/src/test/resources/ip2geo/server/city/manifest.json @@ -0,0 +1,8 @@ +{ + "url": "https://github.com/opensearch-project/geospatial/blob/main/src/test/resources/ip2geo/server/city/city.zip", + "db_name": "data.csv", + "sha256_hash": "oDPgEv+9+kNov7bdQQiLrhr8jQeEPdLnuJ22Hz5npvk=", + "valid_for_in_days": 30, + "updated_at_in_epoch_milli": 1683590400000, + "provider": "opensearch" +} diff --git a/src/test/resources/ip2geo/server/city/manifest_local.json b/src/test/resources/ip2geo/server/city/manifest_local.json new file mode 100644 index 00000000..a69ccbef --- /dev/null +++ b/src/test/resources/ip2geo/server/city/manifest_local.json @@ -0,0 +1,8 @@ +{ + "url": "http://localhost:8001/city/city.zip", + "db_name": "data.csv", + "sha256_hash": "oDPgEv+9+kNov7bdQQiLrhr8jQeEPdLnuJ22Hz5npvk=", + "valid_for_in_days": 30, + "updated_at_in_epoch_milli": 1683590400000, + "provider": "opensearch" +} diff --git a/src/test/resources/ip2geo/server/country/country.zip b/src/test/resources/ip2geo/server/country/country.zip new file mode 100644 index 0000000000000000000000000000000000000000..1c930b1a7441a3f6abed9d9ee089600522a93459 GIT binary patch literal 263 zcmWIWW@Zs#-~d8~sx@H@P~ZckIT#ceQW8rN^^%LrLPK~N*ca@IOHte%mja?oE4UdL zS$;4wFaUKkFnFKy(Y)k&PXDa7=Bg(~CKs5UYq{rMNO0G4RxZ}Kf5GFprJ=r|>O^hb zqZdw`I&tI3l~ZT#9J+Ms!YQx&XIzdO>#G`1^{lHNcyd4WynC2todx4dO5W0Fi4)?EnA( literal 0 HcmV?d00001 diff --git a/src/test/resources/ip2geo/server/country/manifest.json b/src/test/resources/ip2geo/server/country/manifest.json new file mode 100644 index 00000000..25460e5b --- /dev/null +++ b/src/test/resources/ip2geo/server/country/manifest.json @@ -0,0 +1,8 @@ +{ + "url": "https://github.com/opensearch-project/geospatial/blob/main/src/test/resources/ip2geo/server/country/country.zip", + "db_name": "data.csv", + "sha256_hash": "oDPgEv+4+kNov7bdQQiLrhr8jQeEPdLnuJ11Hz5npvk=", + "valid_for_in_days": 30, + "updated_at_in_epoch_milli": 1683590400000, + "provider": "opensearch" +} diff --git a/src/test/resources/ip2geo/server/country/manifest_local.json b/src/test/resources/ip2geo/server/country/manifest_local.json new file mode 100644 index 00000000..4c63840b --- /dev/null +++ b/src/test/resources/ip2geo/server/country/manifest_local.json @@ -0,0 +1,8 @@ +{ + "url": "http://localhost:8001/country/country.zip", + "db_name": "data.csv", + "sha256_hash": "oDPgEv+4+kNov7bdQQiLrhr8jQeEPdLnuJ11Hz5npvk=", + "valid_for_in_days": 30, + "updated_at_in_epoch_milli": 1683590400000, + "provider": "opensearch" +}