Skip to content

Commit

Permalink
Add integration test for Ip2GeoProcessor (opensearch-project#306)
Browse files Browse the repository at this point in the history
Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 21, 2023
1 parent 8d5c6bf commit 3adb06a
Show file tree
Hide file tree
Showing 15 changed files with 456 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

/**
* Generic ResourceInUseException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*/
public class ResourceInUseException extends OpenSearchException {

public ResourceInUseException(String msg, Object... args) {
super(msg, args);
}

public ResourceInUseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ResourceInUseException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected void deleteDatasource(final String datasourceName) throws IOException

private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
throw new OpenSearchException("datasource is being used by one of processors");
throw new ResourceInUseException("datasource is being used by one of processors");
}

DatasourceState previousState = datasource.getState();
Expand All @@ -119,7 +120,7 @@ private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
throw new OpenSearchException("datasource is being used by one of processors");
throw new ResourceInUseException("datasource is being used by one of processors");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
/**
* @param name the datasource name
* @return the datasource name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import static org.opensearch.search.aggregations.Aggregations.AGGREGATIONS_FIELD;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -47,11 +50,11 @@
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONRequestContent;
import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper;
import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction;
import org.opensearch.ingest.Pipeline;

public abstract class GeospatialRestTestCase extends OpenSearchSecureRestTestCase {

public static final String SOURCE = "_source";
public static final String DOC = "_doc";
public static final String URL_DELIMITER = "/";
Expand All @@ -69,11 +72,23 @@ public abstract class GeospatialRestTestCase extends OpenSearchSecureRestTestCas
public static final String SHAPE_ID_FIELD = "id";
public static final String SHAPE_INDEX_PATH_FIELD = "path";
public static final String QUERY_PARAM_TOKEN = "?";
private static final String SETTINGS = "_settings";
private static final String SIMULATE = "_simulate";
private static final String DOCS = "docs";
private static final String DATASOURCES = "datasources";
private static final String STATE = "state";
private static final String PUT = "PUT";
private static final String GET = "GET";
private static final String DELETE = "DELETE";

private static String buildPipelinePath(String name) {
return String.join(URL_DELIMITER, "_ingest", "pipeline", name);
}

private static String buildDatasourcePath(String name) {
return String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name);
}

protected static Response createPipeline(String name, Optional<String> description, List<Map<String, Object>> processorConfigs)
throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
Expand All @@ -95,6 +110,74 @@ protected static void deletePipeline(String name) throws IOException {
client().performRequest(request);
}

protected Response createDatasource(final String name, Map<String, String> properties) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (Map.Entry<String, String> config : properties.entrySet()) {
builder.field(config.getKey(), config.getValue());
}
builder.endObject();

Request request = new Request(PUT, buildDatasourcePath(name));
request.setJsonEntity(Strings.toString(builder));
return client().performRequest(request);
}

protected void waitForDatasourceToBeAvailable(final String name, final Duration timeout) throws Exception {
Instant start = Instant.now();
while (DatasourceState.AVAILABLE.equals(getDatasourceState(name)) == false) {
if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) {
throw new RuntimeException(
String.format(
Locale.ROOT,
"Datasource state didn't change to %s after %d seconds",
DatasourceState.AVAILABLE.name(),
timeout.toSeconds()
)
);
}
Thread.sleep(1000);
}
}

private DatasourceState getDatasourceState(final String name) throws Exception {
List<Map<String, Object>> datasources = (List<Map<String, Object>>) getDatasource(name).get(DATASOURCES);
return DatasourceState.valueOf((String) datasources.get(0).get(STATE));
}

protected Response deleteDatasource(final String name) throws IOException {
Request request = new Request(DELETE, buildDatasourcePath(name));
return client().performRequest(request);
}

protected Map<String, Object> getDatasource(final String name) throws Exception {
Request request = new Request(GET, buildDatasourcePath(name));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected Response updateDatasource(final String name, Map<String, Object> config) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
if (config != null && !config.isEmpty()) {
builder.value(config);
}
builder.endObject();

Request request = new Request(PUT, String.join(URL_DELIMITER, buildDatasourcePath(name), SETTINGS));
request.setJsonEntity(Strings.toString(builder));
return client().performRequest(request);
}

protected Map<String, Object> simulatePipeline(final String name, List<Object> docs) throws Exception {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field(DOCS, docs);
builder.endObject();

Request request = new Request(GET, String.join(URL_DELIMITER, buildPipelinePath(name), SIMULATE));
request.setJsonEntity(Strings.toString(builder));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected static void createIndex(String name, Settings settings, Map<String, String> fieldMap) throws IOException {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject(MAPPING_PROPERTIES_KEY);
for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
Expand Down Expand Up @@ -136,7 +219,7 @@ public static String indexDocument(String indexName, String docID, String body,
return docID;
}

protected Map<String, Object> buildProcessorConfig(final String processorType, final Map<String, String> properties) {
protected Map<String, Object> buildProcessorConfig(final String processorType, final Map<String, Object> properties) {
Map<String, Object> featureProcessor = new HashMap<>();
featureProcessor.put(processorType, properties);
return featureProcessor;
Expand Down
127 changes: 127 additions & 0 deletions src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoDataServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;

import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;

import org.opensearch.common.SuppressForbidden;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

/**
* Simple http server to serve static files under test/java/resources/ip2geo/server for integration testing
*/
@Log4j2
@SuppressForbidden(reason = "used only for testing")
public class Ip2GeoDataServer {
private static final String SYS_PROPERTY_KEY_CLUSTER_ENDPOINT = "tests.rest.cluster";
private static final String LOCAL_CLUSTER_ENDPOINT = "127.0.0.1";
private static final String ROOT = "ip2geo/server";
private static final int PORT = 8001;
private static final String EXTERNAL_ENDPOINT_PREFIX =
"https://github.com/opensearch-project/geospatial/blob/main/src/test/resources/ip2geo/server";

private static HttpServer server;
private static volatile int counter = 0;
private static String endpointPrefix = "http://localhost:" + PORT;
private static String cityFilePath = endpointPrefix + "/city/manifest_local.json";
private static String countryFilePath = endpointPrefix + "/country/manifest_local.json";

/**
* Return an endpoint to a manifest file for a sample city data
* The sample data should contain three lines as follows
*
* cidr,city,country
* 10.0.0.0/8,Seattle,USA
* 127.0.0.0/12,Vancouver,Canada
* fd12:2345:6789:1::/64,Bengaluru,India
*
*/
public static String getEndpointCity() {
return cityFilePath;
}

/**
* Return an endpoint to a manifest file for a sample country data
* The sample data should contain three lines as follows
*
* cidr,country
* 10.0.0.0/8,USA
* 127.0.0.0/12,Canada
* fd12:2345:6789:1::/64,India
*
*/
public static String getEndpointCountry() {
return countryFilePath;
}

@SneakyThrows
synchronized public static void start() {
log.info("Start server is called");
// If it is remote cluster test, use external endpoint and do not launch local server
if (System.getProperty(SYS_PROPERTY_KEY_CLUSTER_ENDPOINT).contains(LOCAL_CLUSTER_ENDPOINT) == false) {
log.info("Remote cluster[{}] testing. Skip launching local server", System.getProperty(SYS_PROPERTY_KEY_CLUSTER_ENDPOINT));
cityFilePath = EXTERNAL_ENDPOINT_PREFIX + "/city/manifest.json";
countryFilePath = EXTERNAL_ENDPOINT_PREFIX + "/country/manifest.json";
return;
}

counter++;
if (server != null) {
log.info("Server has started already");
return;
}
server = HttpServer.create(new InetSocketAddress("localhost", PORT), 0);
server.createContext("/", new Ip2GeoHttpHandler());
server.start();
log.info("Local file server started on port {}", PORT);
}

synchronized public static void stop() {
log.info("Stop server is called");
if (server == null) {
log.info("Server has stopped already");
return;
}
counter--;
if (counter > 0) {
log.info("[{}] processors are still using the server", counter);
return;
}

server.stop(0);
server = null;
log.info("Server stopped");
}

@SuppressForbidden(reason = "used only for testing")
private static class Ip2GeoHttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
try {
byte[] data = Files.readAllBytes(
Paths.get(this.getClass().getClassLoader().getResource(ROOT + exchange.getRequestURI().getPath()).toURI())
);
exchange.sendResponseHeaders(200, data.length);
OutputStream outputStream = exchange.getResponseBody();
outputStream.write(data);
outputStream.flush();
outputStream.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ protected long randomPositiveLong() {
}

protected Datasource randomDatasource() {
int validForInDays = Randomness.get().nextInt(30) + 2;
int validForInDays = Randomness.get().nextInt(30);
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setSchedule(new IntervalSchedule(now, validForInDays - 1, ChronoUnit.DAYS));
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS));
datasource.setState(randomState());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
Expand Down Expand Up @@ -83,8 +84,9 @@ private void validateDoExecuteWithLockError(final Exception exception) {
public void testDoExecute_whenValidInput_thenUpdate() {
Datasource datasource = randomDatasource();
UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName());
request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval() + 1));
request.setEndpoint(sampleManifestUrl());
// Sample manifest has validForDays of 30. Update interval should be less than that.
request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29)));

Task task = mock(Task.class);
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);
Expand Down
Loading

0 comments on commit 3adb06a

Please sign in to comment.