Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add integration test for Ip2GeoProcessor #306

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.rest.RestStatus;

/**
* Generic ResourceInUseException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*/
public class ResourceInUseException extends OpenSearchException {

public ResourceInUseException(String msg, Object... args) {
super(msg, args);
}

public ResourceInUseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ResourceInUseException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected void deleteDatasource(final String datasourceName) throws IOException

private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
throw new OpenSearchException("datasource is being used by one of processors");
throw new ResourceInUseException("datasource is being used by one of processors");
}

DatasourceState previousState = datasource.getState();
Expand All @@ -119,7 +120,7 @@ private void setDatasourceStateAsDeleting(final Datasource datasource) {
if (ip2GeoProcessorFacade.getProcessors(datasource.getName()).isEmpty() == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
throw new OpenSearchException("datasource is being used by one of processors");
throw new ResourceInUseException("datasource is being used by one of processors");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
private static final int MAX_DATASOURCE_NAME_BYTES = 255;
public static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
public static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
/**
* @param name the datasource name
* @return the datasource name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import static org.opensearch.search.aggregations.Aggregations.AGGREGATIONS_FIELD;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.stream.IntStream;
Expand All @@ -46,12 +49,12 @@
import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONRequestContent;
import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper;
import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction;
import org.opensearch.ingest.Pipeline;
import org.opensearch.rest.RestStatus;

public abstract class GeospatialRestTestCase extends OpenSearchSecureRestTestCase {

public static final String SOURCE = "_source";
public static final String DOC = "_doc";
public static final String URL_DELIMITER = "/";
Expand All @@ -69,11 +72,23 @@ public abstract class GeospatialRestTestCase extends OpenSearchSecureRestTestCas
public static final String SHAPE_ID_FIELD = "id";
public static final String SHAPE_INDEX_PATH_FIELD = "path";
public static final String QUERY_PARAM_TOKEN = "?";
private static final String SETTINGS = "_settings";
private static final String SIMULATE = "_simulate";
private static final String DOCS = "docs";
private static final String DATASOURCES = "datasources";
private static final String STATE = "state";
private static final String PUT = "PUT";
private static final String GET = "GET";
private static final String DELETE = "DELETE";

private static String buildPipelinePath(String name) {
return String.join(URL_DELIMITER, "_ingest", "pipeline", name);
}

private static String buildDatasourcePath(String name) {
return String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource", name);
}

protected static Response createPipeline(String name, Optional<String> description, List<Map<String, Object>> processorConfigs)
throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
Expand All @@ -95,6 +110,74 @@ protected static void deletePipeline(String name) throws IOException {
client().performRequest(request);
}

protected Response createDatasource(final String name, Map<String, String> properties) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
for (Map.Entry<String, String> config : properties.entrySet()) {
builder.field(config.getKey(), config.getValue());
}
builder.endObject();

Request request = new Request(PUT, buildDatasourcePath(name));
request.setJsonEntity(Strings.toString(builder));
return client().performRequest(request);
}

protected void waitForDatasourceToBeAvailable(final String name, final Duration timeout) throws Exception {
Instant start = Instant.now();
while (DatasourceState.AVAILABLE.equals(getDatasourceState(name)) == false) {
if (Duration.between(start, Instant.now()).compareTo(timeout) > 0) {
throw new RuntimeException(
String.format(
Locale.ROOT,
"Datasource state didn't change to %s after %d seconds",
DatasourceState.AVAILABLE.name(),
timeout.toSeconds()
)
);
}
Thread.sleep(1000);
}
}

private DatasourceState getDatasourceState(final String name) throws Exception {
List<Map<String, Object>> datasources = (List<Map<String, Object>>) getDatasource(name).get(DATASOURCES);
return DatasourceState.valueOf((String) datasources.get(0).get(STATE));
}

protected Response deleteDatasource(final String name) throws IOException {
Request request = new Request(DELETE, buildDatasourcePath(name));
return client().performRequest(request);
}

protected Map<String, Object> getDatasource(final String name) throws Exception {
Request request = new Request(GET, buildDatasourcePath(name));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected Response updateDatasource(final String name, Map<String, Object> config) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
if (config != null && !config.isEmpty()) {
builder.value(config);
}
builder.endObject();

Request request = new Request(PUT, String.join(URL_DELIMITER, buildDatasourcePath(name), SETTINGS));
request.setJsonEntity(Strings.toString(builder));
return client().performRequest(request);
}

protected Map<String, Object> simulatePipeline(final String name, List<Object> docs) throws Exception {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field(DOCS, docs);
builder.endObject();

Request request = new Request(GET, String.join(URL_DELIMITER, buildPipelinePath(name), SIMULATE));
request.setJsonEntity(Strings.toString(builder));
Response response = client().performRequest(request);
return createParser(XContentType.JSON.xContent(), EntityUtils.toString(response.getEntity())).map();
}

protected static void createIndex(String name, Settings settings, Map<String, String> fieldMap) throws IOException {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder().startObject().startObject(MAPPING_PROPERTIES_KEY);
for (Map.Entry<String, String> entry : fieldMap.entrySet()) {
Expand Down Expand Up @@ -136,7 +219,7 @@ public static String indexDocument(String indexName, String docID, String body,
return docID;
}

protected Map<String, Object> buildProcessorConfig(final String processorType, final Map<String, String> properties) {
protected Map<String, Object> buildProcessorConfig(final String processorType, final Map<String, Object> properties) {
Map<String, Object> featureProcessor = new HashMap<>();
featureProcessor.put(processorType, properties);
return featureProcessor;
Expand Down
127 changes: 127 additions & 0 deletions src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoDataServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Paths;

import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;

import org.opensearch.common.SuppressForbidden;

import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

/**
* Simple http server to serve static files under test/java/resources/ip2geo/server for integration testing
*/
@Log4j2
@SuppressForbidden(reason = "used only for testing")
public class Ip2GeoDataServer {
private static final String SYS_PROPERTY_KEY_CLUSTER_ENDPOINT = "tests.rest.cluster";
private static final String LOCAL_CLUSTER_ENDPOINT = "127.0.0.1";
private static final String ROOT = "ip2geo/server";
private static final int PORT = 8001;
private static final String EXTERNAL_ENDPOINT_PREFIX =
"https://github.com/opensearch-project/geospatial/blob/main/src/test/resources/ip2geo/server";

private static HttpServer server;
private static volatile int counter = 0;
private static String endpointPrefix = "http://localhost:" + PORT;
private static String cityFilePath = endpointPrefix + "/city/manifest_local.json";
private static String countryFilePath = endpointPrefix + "/country/manifest_local.json";

/**
* Return an endpoint to a manifest file for a sample city data
* The sample data should contain three lines as follows
*
* cidr,city,country
* 10.0.0.0/8,Seattle,USA
* 127.0.0.0/12,Vancouver,Canada
* fd12:2345:6789:1::/64,Bengaluru,India
*
*/
public static String getEndpointCity() {
return cityFilePath;
}

/**
* Return an endpoint to a manifest file for a sample country data
* The sample data should contain three lines as follows
*
* cidr,country
* 10.0.0.0/8,USA
* 127.0.0.0/12,Canada
* fd12:2345:6789:1::/64,India
*
*/
public static String getEndpointCountry() {
return countryFilePath;
}

@SneakyThrows
synchronized public static void start() {
log.info("Start server is called");
// If it is remote cluster test, use external endpoint and do not launch local server
if (System.getProperty(SYS_PROPERTY_KEY_CLUSTER_ENDPOINT).contains(LOCAL_CLUSTER_ENDPOINT) == false) {
log.info("Remote cluster[{}] testing. Skip launching local server", System.getProperty(SYS_PROPERTY_KEY_CLUSTER_ENDPOINT));
cityFilePath = EXTERNAL_ENDPOINT_PREFIX + "/city/manifest.json";
countryFilePath = EXTERNAL_ENDPOINT_PREFIX + "/country/manifest.json";
return;
}

counter++;
if (server != null) {
log.info("Server has started already");
return;
}
server = HttpServer.create(new InetSocketAddress("localhost", PORT), 0);
server.createContext("/", new Ip2GeoHttpHandler());
server.start();
log.info("Local file server started on port {}", PORT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add similar log for before start event? in case server failed to start such log message can help in troubleshooting

}

synchronized public static void stop() {
log.info("Stop server is called");
if (server == null) {
log.info("Server has stopped already");
return;
}
counter--;
if (counter > 0) {
log.info("[{}] processors are still using the server", counter);
return;
}

server.stop(0);
server = null;
log.info("Server stopped");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as for server start, please add logging before server stop call

}

@SuppressForbidden(reason = "used only for testing")
private static class Ip2GeoHttpHandler implements HttpHandler {
@Override
public void handle(final HttpExchange exchange) throws IOException {
try {
byte[] data = Files.readAllBytes(
Paths.get(this.getClass().getClassLoader().getResource(ROOT + exchange.getRequestURI().getPath()).toURI())
);
exchange.sendResponseHeaders(200, data.length);
OutputStream outputStream = exchange.getResponseBody();
outputStream.write(data);
outputStream.flush();
outputStream.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,11 @@ protected long randomPositiveLong() {
}

protected Datasource randomDatasource() {
int validForInDays = Randomness.get().nextInt(30) + 2;
int validForInDays = Randomness.get().nextInt(30);
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
Datasource datasource = new Datasource();
datasource.setName(GeospatialTestHelper.randomLowerCaseString());
datasource.setSchedule(new IntervalSchedule(now, validForInDays - 1, ChronoUnit.DAYS));
datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(29), ChronoUnit.DAYS));
datasource.setState(randomState());
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
Expand Down Expand Up @@ -83,8 +84,9 @@ private void validateDoExecuteWithLockError(final Exception exception) {
public void testDoExecute_whenValidInput_thenUpdate() {
Datasource datasource = randomDatasource();
UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName());
request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval() + 1));
request.setEndpoint(sampleManifestUrl());
// Sample manifest has validForDays of 30. Update interval should be less than that.
request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29)));

Task task = mock(Task.class);
when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource);
Expand Down
Loading