Skip to content

Commit

Permalink
Add integration test for Ip2GeoProcessor
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed May 12, 2023
1 parent d6d23bb commit bc9f0d2
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

/**
* Generic ResourceInUseException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*/
public class ResourceInUseException extends OpenSearchException {

public ResourceInUseException(String msg, Object... args) {
super(msg, args);
}

public ResourceInUseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ResourceInUseException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected void deleteDatasource(final String datasourceName) throws IOException

private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
throw new OpenSearchException("datasource is being used by one of processors");
throw new ResourceInUseException("datasource is being used by one of processors");
}

DatasourceState previousState = datasource.getState();
Expand All @@ -119,7 +120,7 @@ private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
throw new OpenSearchException("datasource is being used by one of processors");
throw new ResourceInUseException("datasource is being used by one of processors");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
/**
* @param name the datasource name
* @return the datasource name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import static org.opensearch.search.aggregations.Aggregations.AGGREGATIONS_FIELD;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;
Expand All @@ -46,6 +49,7 @@
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONRequestContent;
import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper;
import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction;
import org.opensearch.ingest.Pipeline;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -95,6 +99,69 @@ protected static void deletePipeline(String name) throws IOException {
client().performRequest(request);
}

protected Response createDatasource(final String name, Map<String, String> properties) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (Map.Entry<String, String> config : properties.entrySet()) {
builder.field(config.getKey(), config.getValue());
}
builder.endObject();

Request request = new Request("PUT", String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name));
request.setJsonEntity(Strings.toString(builder));
return client().performRequest(request);
}

protected void waitForDatasourceStateTobe(final String name, final DatasourceState state, final Duration timeout) throws Exception {
Instant start = Instant.now();
while (state.equals(getDatasourceState(name)) == false) {
if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) {
throw new RuntimeException(
String.format(Locale.ROOT, "Datasource state didn't change to %s after %d seconds", state.name(), timeout.toSeconds())
);
}
Thread.sleep(1000);
}
}

private DatasourceState getDatasourceState(final String name) throws Exception {
List<Map<String, Object>> datasources = (List<Map<String, Object>>) getDatasource(name).get("datasources");
return DatasourceState.valueOf((String) datasources.get(0).get("state"));
}

protected Response deleteDatasource(final String name) throws IOException {
Request request = new Request("DELETE", String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name));
return client().performRequest(request);
}

protected Map<String, Object> getDatasource(final String name) throws Exception {
Request request = new Request("GET", String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected Response updateDatasource(final String name, Map<String, Object> config) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
if (config != null && !config.isEmpty()) {
builder.value(config);
}
builder.endObject();

Request request = new Request("PUT", String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name, "_settings"));
request.setJsonEntity(Strings.toString(builder));
return client().performRequest(request);
}

protected Map<String, Object> simulatePipeline(final String name, List<Object> docs) throws Exception {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field("docs", docs);
builder.endObject();

Request request = new Request("GET", String.join(URL_DELIMITER, "_ingest/pipeline", name, "_simulate"));
request.setJsonEntity(Strings.toString(builder));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected static void createIndex(String name, Settings settings, Map<String, String> fieldMap) throws IOException {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject(MAPPING_PROPERTIES_KEY);
for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
Expand Down Expand Up @@ -136,7 +203,7 @@ public static String indexDocument(String indexName, String docID, String body,
return docID;
}

protected Map<String, Object> buildProcessorConfig(final String processorType, final Map<String, String> properties) {
protected Map<String, Object> buildProcessorConfig(final String processorType, final Map<String, Object> properties) {
Map<String, Object> featureProcessor = new HashMap<>();
featureProcessor.put(processorType, properties);
return featureProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,42 @@

package org.opensearch.geospatial.ip2geo.processor;

import lombok.SneakyThrows;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.common.Randomness;
import org.opensearch.geospatial.GeospatialRestTestCase;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;

import lombok.SneakyThrows;

import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.geospatial.GeospatialRestTestCase;
import org.opensearch.geospatial.GeospatialTestHelper;
import java.util.stream.Collectors;

public class Ip2GeoProcessorIT extends GeospatialRestTestCase {
/**
* This is sample manifest file pointing to a sample csv data
* The sample data contains three lines as follows
*
* cidr,city,country
* 10.0.0.0/8,Seattle,USA
* 127.0.0.0/12,Vancouver,Canada
* fd12:2345:6789:1::/64,Bengaluru,India
*
*/
private static final String SAMPLE_ENDPOINT = "https://geoip.maps.opensearch.org/v1/sample/manifest.json";
private static final String CITY = "city";
private static final String COUNTRY = "country";
private static final String IP = "ip";
private static final String SOURCE = "_source";

@SneakyThrows
public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() {
Expand All @@ -33,22 +54,163 @@ public void testCreateIp2GeoProcessor_whenNoSuchDatasourceExist_thenFails() {

// Verify
assertTrue(exception.getMessage().contains("doesn't exist"));
assertEquals(400, exception.getResponse().getStatusLine().getStatusCode());
}

@SneakyThrows
public void testCreateIp2GeoProcessor_whenValidInput_thenAddData() {
String pipelineName = GeospatialTestHelper.randomLowerCaseString();
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
String targetField = GeospatialTestHelper.randomLowerCaseString();
String field = GeospatialTestHelper.randomLowerCaseString();

Map<String, String> datasourceProperties = Map.of(PutDatasourceRequest.ENDPOINT_FIELD.getPreferredName(), SAMPLE_ENDPOINT);

// Create datasource and wait for it to be available
createDatasource(datasourceName, datasourceProperties);
waitForDatasourceStateTobe(datasourceName, DatasourceState.AVAILABLE, Duration.ofSeconds(10));

Map<String, Object> processorProperties = Map.of(
Ip2GeoProcessor.CONFIG_FIELD,
field,
Ip2GeoProcessor.CONFIG_DATASOURCE,
datasourceName,
Ip2GeoProcessor.CONFIG_TARGET_FIELD,
targetField,
Ip2GeoProcessor.CONFIG_PROPERTIES,
Arrays.asList(IP, CITY)
);

// Create ip2geo processor
createIp2GeoProcessorPipeline(pipelineName, processorProperties);

Map<String, Map<String, String>> sampleData = getSampleData();
List<Object> docs = sampleData.entrySet().stream().map(entry -> createDocument(field, entry.getKey())).collect(Collectors.toList());

// Simulate processor
Map<String, Object> response = simulatePipeline(pipelineName, docs);

// Verify data added to document
List<Map<String, String>> sources = convertToListOfSources(response, targetField);
sources.stream().forEach(source -> {
assertFalse(source.containsKey(COUNTRY));
assertEquals(sampleData.get(source.get(IP)).get(CITY), source.get(CITY));
});

// Delete datasource fails when there is a process using it
ResponseException exception = expectThrows(ResponseException.class, () -> deleteDatasource(datasourceName));
// Verify
assertEquals(400, exception.getResponse().getStatusLine().getStatusCode());

// Delete resources
deletePipeline(pipelineName);
deleteDatasource(datasourceName);
}

private Response createIp2GeoProcessorPipeline(final String pipelineName, final Map<String, String> properties) throws IOException {
private Response createIp2GeoProcessorPipeline(final String pipelineName, final Map<String, Object> properties) throws IOException {
String field = GeospatialTestHelper.randomLowerCaseString();
String datasourceName = GeospatialTestHelper.randomLowerCaseString();
Map<String, String> defaultProperties = Map.of(
Map<String, Object> defaultProperties = Map.of(
Ip2GeoProcessor.CONFIG_FIELD,
field,
Ip2GeoProcessor.CONFIG_DATASOURCE,
datasourceName
);
Map<String, String> baseProperties = new HashMap<>();
Map<String, Object> baseProperties = new HashMap<>();
baseProperties.putAll(defaultProperties);
baseProperties.putAll(properties);
Map<String, Object> processorConfig = buildProcessorConfig(Ip2GeoProcessor.TYPE, baseProperties);

return createPipeline(pipelineName, Optional.empty(), Arrays.asList(processorConfig));
}

private Map<String, Map<String, String>> getSampleData() {
Map<String, Map<String, String>> sampleData = new HashMap<>();
sampleData.put(
String.format(
Locale.ROOT,
"10.%d.%d.%d",
Randomness.get().nextInt(255),
Randomness.get().nextInt(255),
Randomness.get().nextInt(255)
),
Map.of(CITY, "Seattle", COUNTRY, "USA")
);
sampleData.put(
String.format(
Locale.ROOT,
"127.%d.%d.%d",
Randomness.get().nextInt(15),
Randomness.get().nextInt(255),
Randomness.get().nextInt(255)
),
Map.of(CITY, "Vancouver", COUNTRY, "Canada")
);
sampleData.put(
String.format(
Locale.ROOT,
"fd12:2345:6789:1:%x:%x:%x:%x",
Randomness.get().nextInt(65535),
Randomness.get().nextInt(65535),
Randomness.get().nextInt(65535),
Randomness.get().nextInt(65535)
),
Map.of(CITY, "Bengaluru", COUNTRY, "India")
);
return sampleData;
}

private Map<String, Map<String, String>> createDocument(String... args) {
if (args.length % 2 == 1) {
throw new RuntimeException("Number of arguments should be even");
}
Map<String, String> source = new HashMap<>();
for (int i = 0; i < args.length; i += 2) {
source.put(args[0], args[1]);
}
return Map.of(SOURCE, source);
}

/**
* This method convert returned value of simulatePipeline method to a list of sources
*
* For example,
* Input:
* {
* "docs" : [
* {
* "doc" : {
* "_index" : "_index",
* "_id" : "_id",
* "_source" : {
* "ip2geo" : {
* "ip" : "127.0.0.1",
* "city" : "Seattle"
* },
* "_ip" : "127.0.0.1"
* },
* "_ingest" : {
* "timestamp" : "2023-05-12T17:41:42.939703Z"
* }
* }
* }
* ]
* }
*
* Output:
* [
* {
* "ip" : "127.0.0.1",
* "city" : "Seattle"
* }
* ]
*
*/
private List<Map<String, String>> convertToListOfSources(final Map<String, Object> data, final String targetField) {
List<Map<String, Map<String, Object>>> docs = (List<Map<String, Map<String, Object>>>) data.get("docs");
return docs.stream()
.map(doc -> (Map<String, Map<String, String>>) doc.get("doc").get(SOURCE))
.map(source -> source.get(targetField))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testIndexGeoJSONSuccess() throws Exception {
Map<String, String> geoFields = new HashMap<>();
geoFields.put(geoShapeField, "geo_shape");

Map<String, String> processorProperties = new HashMap<>();
Map<String, Object> processorProperties = new HashMap<>();
processorProperties.put(FeatureProcessor.FIELD_KEY, geoShapeField);
Map<String, Object> geoJSONProcessorConfig = buildProcessorConfig(FeatureProcessor.TYPE, processorProperties);
List<Map<String, Object>> configs = new ArrayList<>();
Expand Down

0 comments on commit bc9f0d2

Please sign in to comment.