From d77c90e7161390eb728c1aee7c912def547e1ef0 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Fri, 5 May 2023 14:20:01 -0700 Subject: [PATCH] Implement datasource update API Signed-off-by: Heemin Kim --- .../action/RestUpdateDatasourceHandler.java | 52 ++++ .../ip2geo/action/UpdateDatasourceAction.java | 27 ++ .../action/UpdateDatasourceRequest.java | 168 +++++++++++++ .../UpdateDatasourceTransportAction.java | 185 ++++++++++++++ .../jobscheduler/DatasourceUpdateService.java | 27 +- .../geospatial/plugin/GeospatialPlugin.java | 32 ++- .../geospatial/ip2geo/Ip2GeoTestCase.java | 9 +- .../action/RestPutDatasourceHandlerTests.java | 5 +- .../RestUpdateDatasourceHandlerTests.java | 56 +++++ .../action/UpdateDatasourceRequestTests.java | 121 +++++++++ .../UpdateDatasourceTransportActionTests.java | 231 ++++++++++++++++++ .../DatasourceUpdateServiceTests.java | 26 ++ .../plugin/GeospatialPluginTests.java | 4 +- 13 files changed, 928 insertions(+), 15 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequestTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java new file mode 100644 index 00000000..77abae84 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandler.java @@ -0,0 +1,52 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; +import static org.opensearch.rest.RestRequest.Method.PUT; + +import java.io.IOException; +import java.util.List; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +/** + * Rest handler for Ip2Geo datasource update request + */ +public class RestUpdateDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "ip2geo_datasource_update"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name")); + if (request.hasContentOrSourceParam()) { + try (XContentParser parser = request.contentOrSourceParamParser()) { + PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null); + } + } + return channel -> client.executeLocally( + UpdateDatasourceAction.INSTANCE, + putDatasourceRequest, + new RestToXContentListener<>(channel) + ); + } + + @Override + public List routes() { + String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}/_settings"); + return List.of(new Route(PUT, path)); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceAction.java new file mode 100644 index 00000000..96cd00df --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceAction.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Ip2Geo datasource update action + */ +public class UpdateDatasourceAction extends ActionType { + /** + * Update datasource action instance + */ + public static final UpdateDatasourceAction INSTANCE = new UpdateDatasourceAction(); + /** + * Update datasource action name + */ + public static final String NAME = "cluster:admin/geospatial/datasource/update"; + + private UpdateDatasourceAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java new file mode 100644 index 00000000..fdc0b357 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequest.java @@ -0,0 +1,168 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Locale; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; + +/** + * Ip2Geo datasource update request + */ +@Getter +@Setter +@Log4j2 +@EqualsAndHashCode(callSuper = false) +public class UpdateDatasourceRequest extends AcknowledgedRequest { + private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + private static final int MAX_DATASOURCE_NAME_BYTES = 255; + /** + * @param name the datasource name + * @return the datasource name + */ + private String name; + /** + * @param endpoint url to a manifest file for a datasource + * @return url to a manifest file for a datasource + */ + private String endpoint; + /** + * @param updateInterval update interval of a datasource + * @return update interval of a datasource + */ + private TimeValue updateInterval; + + /** + * Parser of a datasource + */ + public static final ObjectParser PARSER; + static { + PARSER = new ObjectParser<>("update_datasource"); + PARSER.declareString((request, val) -> request.setEndpoint(val), ENDPOINT_FIELD); + PARSER.declareLong((request, val) -> request.setUpdateInterval(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD); + } + + /** + * Constructor + * @param name name of a datasource + */ + public UpdateDatasourceRequest(final String name) { + this.name = name; + } + + /** + * Constructor + * @param in the stream input + * @throws IOException IOException + */ + public UpdateDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + this.endpoint = in.readOptionalString(); + this.updateInterval = in.readOptionalTimeValue(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeOptionalString(endpoint); + out.writeOptionalTimeValue(updateInterval); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = new ActionRequestValidationException(); + if (endpoint == null && updateInterval == null) { + errors.addValidationError("no values to update"); + } + + validateEndpoint(errors); + validateUpdateInterval(errors); + + return errors.validationErrors().isEmpty() ? null : errors; + } + + /** + * Conduct following validation on endpoint + * 1. endpoint format complies with RFC-2396 + * 2. validate manifest file from the endpoint + * + * @param errors the errors to add error messages + */ + private void validateEndpoint(final ActionRequestValidationException errors) { + if (endpoint == null) { + return; + } + + try { + URL url = new URL(endpoint); + url.toURI(); // Validate URL complies with RFC-2396 + validateManifestFile(url, errors); + } catch (MalformedURLException | URISyntaxException e) { + log.info("Invalid URL[{}] is provided", endpoint, e); + errors.addValidationError("Invalid URL format is provided"); + } + } + + /** + * Conduct following validation on url + * 1. can read manifest file from the endpoint + * 2. the url in the manifest file complies with RFC-2396 + * + * @param url the url to validate + * @param errors the errors to add error messages + */ + private void validateManifestFile(final URL url, final ActionRequestValidationException errors) { + DatasourceManifest manifest; + try { + manifest = DatasourceManifest.Builder.build(url); + } catch (Exception e) { + log.info("Error occurred while reading a file from {}", url, e); + errors.addValidationError(String.format(Locale.ROOT, "Error occurred while reading a file from %s: %s", url, e.getMessage())); + return; + } + + try { + new URL(manifest.getUrl()).toURI(); // Validate URL complies with RFC-2396 + } catch (MalformedURLException | URISyntaxException e) { + log.info("Invalid URL[{}] is provided for url field in the manifest file", manifest.getUrl(), e); + errors.addValidationError("Invalid URL format is provided for url field in the manifest file"); + } + } + + /** + * Validate updateInterval is equal or larger than 1 + * + * @param errors the errors to add error messages + */ + private void validateUpdateInterval(final ActionRequestValidationException errors) { + if (updateInterval == null) { + return; + } + + if (updateInterval.compareTo(TimeValue.timeValueDays(1)) < 0) { + errors.addValidationError("Update interval should be equal to or larger than 1 day"); + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java new file mode 100644 index 00000000..20634552 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportAction.java @@ -0,0 +1,185 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; +import java.net.URL; +import java.security.InvalidParameterException; +import java.time.temporal.ChronoUnit; +import java.util.List; +import java.util.Locale; + +import lombok.extern.log4j.Log4j2; + +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * Transport action to update datasource + */ +@Log4j2 +public class UpdateDatasourceTransportAction extends HandledTransportAction { + private static final long LOCK_DURATION_IN_SECONDS = 300l; + private final Ip2GeoLockService lockService; + private final DatasourceFacade datasourceFacade; + private final DatasourceUpdateService datasourceUpdateService; + + /** + * Constructor + * + * @param transportService the transport service + * @param actionFilters the action filters + * @param lockService the lock service + * @param datasourceFacade the datasource facade + * @param datasourceUpdateService the datasource update service + */ + @Inject + public UpdateDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final Ip2GeoLockService lockService, + final DatasourceFacade datasourceFacade, + final DatasourceUpdateService datasourceUpdateService + ) { + super(UpdateDatasourceAction.NAME, transportService, actionFilters, UpdateDatasourceRequest::new); + this.lockService = lockService; + this.datasourceUpdateService = datasourceUpdateService; + this.datasourceFacade = datasourceFacade; + } + + /** + * Get a lock and update datasource + * + * @param task the task + * @param request the request + * @param listener the listener + */ + @Override + protected void doExecute(final Task task, final UpdateDatasourceRequest request, final ActionListener listener) { + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + return; + } + try { + Datasource datasource = datasourceFacade.getDatasource(request.getName()); + if (datasource == null) { + listener.onFailure(new ResourceNotFoundException("no such datasource exist")); + return; + } + validate(request, datasource); + updateIfChanged(request, datasource); + listener.onResponse(new AcknowledgedResponse(true)); + } catch (Exception e) { + listener.onFailure(e); + } finally { + lockService.releaseLock( + lock, + ActionListener.wrap( + released -> { log.info("Released lock for datasource[{}]", request.getName()); }, + exception -> { log.error("Failed to release the lock", exception); } + ) + ); + } + }, exception -> { listener.onFailure(exception); })); + } + + private void updateIfChanged(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { + boolean isChanged = false; + if (isEndpointChanged(request, datasource)) { + datasource.setEndpoint(request.getEndpoint()); + isChanged = true; + } + + if (isUpdateIntervalChanged(request, datasource)) { + datasource.setSchedule( + new IntervalSchedule(datasource.getSchedule().getStartTime(), (int) request.getUpdateInterval().getDays(), ChronoUnit.DAYS) + ); + isChanged = true; + } + + if (isChanged) { + datasourceFacade.updateDatasource(datasource); + } + } + + /** + * Additional validation based on an existing datasource + * + * Basic validation is done in UpdateDatasourceRequest#validate + * In this method we do additional validation based on an existing datasource + * + * 1. Check the compatibility of new fields and old fields + * 2. Check the updateInterval is less than validForInDays in datasource + * + * This method throws exception if one of validation fails. + * + * @param request the update request + * @param datasource the existing datasource + * @throws IOException the exception + */ + private void validate(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { + validateFieldsCompatibility(request, datasource); + validateUpdateIntervalIsLessThanValidForInDays(request, datasource); + } + + private void validateFieldsCompatibility(final UpdateDatasourceRequest request, final Datasource datasource) throws IOException { + if (isEndpointChanged(request, datasource) == false) { + return; + } + + List fields = datasourceUpdateService.getHeaderFields(request.getEndpoint()); + if (datasource.isCompatible(fields) == false) { + throw new OpenSearchException( + "new fields [{}] does not contain all old fields [{}]", + fields.toString(), + datasource.getDatabase().getFields().toString() + ); + } + } + + private void validateUpdateIntervalIsLessThanValidForInDays(final UpdateDatasourceRequest request, final Datasource datasource) + throws IOException { + if (isEndpointChanged(request, datasource) == false && isUpdateIntervalChanged(request, datasource) == false) { + return; + } + + long validForInDays = isEndpointChanged(request, datasource) + ? DatasourceManifest.Builder.build(new URL(request.getEndpoint())).getValidForInDays() + : datasource.getDatabase().getValidForInDays(); + + long updateInterval = isUpdateIntervalChanged(request, datasource) + ? request.getUpdateInterval().days() + : datasource.getSchedule().getInterval(); + + if (updateInterval >= validForInDays) { + throw new InvalidParameterException( + String.format(Locale.ROOT, "updateInterval %d should be smaller than %d", updateInterval, validForInDays) + ); + } + } + + private boolean isEndpointChanged(final UpdateDatasourceRequest request, final Datasource datasource) { + return request.getEndpoint() != null && request.getEndpoint().equals(datasource.getEndpoint()) == false; + } + + private boolean isUpdateIntervalChanged(final UpdateDatasourceRequest request, final Datasource datasource) { + return request.getUpdateInterval() != null && (int) request.getUpdateInterval().days() != datasource.getSchedule().getInterval(); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 57be4d4a..4d9530db 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -48,11 +48,15 @@ public DatasourceUpdateService( /** * Update GeoIp data * + * The first column is ip range field regardless its header name. + * Therefore, we don't store the first column's header name. + * * @param datasource the datasource * @param renewLock runnable to renew lock - * @throws Exception + * + * @throws IOException */ - public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable renewLock) throws Exception { + public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable renewLock) throws IOException { URL url = new URL(datasource.getEndpoint()); DatasourceManifest manifest = DatasourceManifest.Builder.build(url); @@ -91,6 +95,25 @@ public void updateOrCreateGeoIpData(final Datasource datasource, final Runnable updateDatasourceAsSucceeded(datasource, manifest, fieldsToStore, startTime, endTime); } + /** + * Return header fields of geo data with given url of a manifest file + * + * The first column is ip range field regardless its header name. + * Therefore, we don't store the first column's header name. + * + * @param manifestUrl the url of a manifest file + * @return header fields of geo data + */ + public List getHeaderFields(String manifestUrl) throws IOException { + URL url = new URL(manifestUrl); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + + try (CSVParser reader = geoIpDataFacade.getDatabaseReader(manifest)) { + String[] fields = reader.iterator().next().values(); + return Arrays.asList(fields).subList(1, fields.length); + } + } + /** * Delete all indices except the one which are being used * diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index ff086a15..48454667 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -42,6 +42,9 @@ import org.opensearch.geospatial.ip2geo.action.PutDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; +import org.opensearch.geospatial.ip2geo.action.RestUpdateDatasourceHandler; +import org.opensearch.geospatial.ip2geo.action.UpdateDatasourceAction; +import org.opensearch.geospatial.ip2geo.action.UpdateDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; @@ -159,22 +162,37 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return List.of( - new RestUploadStatsAction(), - new RestUploadGeoJSONAction(), + List geoJsonHandlers = List.of(new RestUploadStatsAction(), new RestUploadGeoJSONAction()); + + List ip2geoHandlers = List.of( new RestPutDatasourceHandler(clusterSettings), - new RestGetDatasourceHandler() + new RestGetDatasourceHandler(), + new RestUpdateDatasourceHandler() ); + + List allHandlers = new ArrayList<>(); + allHandlers.addAll(geoJsonHandlers); + allHandlers.addAll(ip2geoHandlers); + return allHandlers; } @Override public List> getActions() { - return List.of( + List> geoJsonHandlers = List.of( new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class), - new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class), + new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class) + ); + + List> ip2geoHandlers = List.of( new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class), - new ActionHandler<>(GetDatasourceAction.INSTANCE, GetDatasourceTransportAction.class) + new ActionHandler<>(GetDatasourceAction.INSTANCE, GetDatasourceTransportAction.class), + new ActionHandler<>(UpdateDatasourceAction.INSTANCE, UpdateDatasourceTransportAction.class) ); + + List> allHandlers = new ArrayList<>(); + allHandlers.addAll(geoJsonHandlers); + allHandlers.addAll(ip2geoHandlers); + return allHandlers; } @Override diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index a28ba089..f1f456a7 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -19,6 +19,8 @@ import java.util.function.BiFunction; import java.util.stream.Collectors; +import lombok.SneakyThrows; + import org.junit.After; import org.junit.Before; import org.mockito.Mock; @@ -141,8 +143,9 @@ protected String randomIpAddress() { ); } + @SneakyThrows @SuppressForbidden(reason = "unit test") - protected String sampleManifestUrl() throws Exception { + protected String sampleManifestUrl() { return Paths.get(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").toURI()).toUri().toURL().toExternalForm(); } @@ -168,10 +171,10 @@ protected Datasource randomDatasource() { Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS); Datasource datasource = new Datasource(); datasource.setName(GeospatialTestHelper.randomLowerCaseString()); - datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(30) + 1, ChronoUnit.DAYS)); + datasource.setSchedule(new IntervalSchedule(now, Randomness.get().nextInt(10) + 1, ChronoUnit.DAYS)); datasource.setState(randomState()); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); - datasource.setEndpoint(GeospatialTestHelper.randomLowerCaseString()); + datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase() .setFields(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase().setProvider(GeospatialTestHelper.randomLowerCaseString()); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java index 97c3cb7d..c2609248 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandlerTests.java @@ -9,6 +9,7 @@ import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; import java.util.HashSet; +import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; @@ -40,7 +41,7 @@ public void testPrepareRequest() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); String content = "{\"endpoint\":\"https://test.com\", \"update_interval_in_days\":1}"; RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) - .withPath(String.format(path, datasourceName)) + .withPath(String.format(Locale.ROOT, path, datasourceName)) .withContent(new BytesArray(content), XContentType.JSON) .build(); AtomicBoolean isExecuted = new AtomicBoolean(false); @@ -62,7 +63,7 @@ public void testPrepareRequest() { public void testPrepareRequestDefaultValue() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) - .withPath(String.format(path, datasourceName)) + .withPath(String.format(Locale.ROOT, path, datasourceName)) .withContent(new BytesArray("{}"), XContentType.JSON) .build(); AtomicBoolean isExecuted = new AtomicBoolean(false); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java new file mode 100644 index 00000000..6af74414 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestUpdateDatasourceHandlerTests.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; + +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +public class RestUpdateDatasourceHandlerTests extends RestActionTestCase { + private String path; + private RestUpdateDatasourceHandler handler; + + @Before + public void setupAction() { + handler = new RestUpdateDatasourceHandler(); + controller().registerHandler(handler); + path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/%s/_settings"); + } + + public void testPrepareRequest_whenValidInput_thenSucceed() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String content = "{\"endpoint\":\"https://test.com\", \"update_interval_in_days\":1}"; + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath(String.format(Locale.ROOT, path, datasourceName)) + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + AtomicBoolean isExecuted = new AtomicBoolean(false); + + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof PutDatasourceRequest); + PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest; + assertEquals("https://test.com", putDatasourceRequest.getEndpoint()); + assertEquals(TimeValue.timeValueDays(1), putDatasourceRequest.getUpdateInterval()); + assertEquals(datasourceName, putDatasourceRequest.getName()); + isExecuted.set(true); + return null; + }); + + dispatchRequest(request); + assertTrue(isExecuted.get()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequestTests.java new file mode 100644 index 00000000..36ff271f --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceRequestTests.java @@ -0,0 +1,121 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.Locale; + +import lombok.SneakyThrows; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.Randomness; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class UpdateDatasourceRequestTests extends Ip2GeoTestCase { + + public void testValidate_whenNullValues_thenFails() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertEquals("no values to update", exception.validationErrors().get(0)); + } + + public void testValidate_whenInvalidUrl_thenFails() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + request.setEndpoint("invalidUrl"); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertEquals("Invalid URL format is provided", exception.validationErrors().get(0)); + } + + public void testValidate_whenInvalidManifestFile_thenFails() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String domain = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain)); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertTrue(exception.validationErrors().get(0).contains("Error occurred while reading a file")); + } + + @SneakyThrows + public void testValidate_whenValidInput_thenSucceed() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + request.setEndpoint(sampleManifestUrl()); + request.setUpdateInterval(TimeValue.timeValueDays(1)); + + // Run and verify + assertNull(request.validate()); + } + + @SneakyThrows + public void testValidate_whenZeroUpdateInterval_thenFails() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + request.setUpdateInterval(TimeValue.timeValueDays(0)); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertEquals( + String.format(Locale.ROOT, "Update interval should be equal to or larger than 1 day"), + exception.validationErrors().get(0) + ); + } + + @SneakyThrows + public void testValidate_whenInvalidUrlInsideManifest_thenFail() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + request.setEndpoint(sampleManifestUrlWithInvalidUrl()); + request.setUpdateInterval(TimeValue.timeValueDays(1)); + + // Run + ActionRequestValidationException exception = request.validate(); + + // Verify + assertEquals(1, exception.validationErrors().size()); + assertTrue(exception.validationErrors().get(0).contains("Invalid URL format")); + } + + @SneakyThrows + public void testStreamInOut_whenValidInput_thenSucceed() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + String domain = GeospatialTestHelper.randomLowerCaseString(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasourceName); + request.setEndpoint(String.format(Locale.ROOT, "https://%s.com", domain)); + request.setUpdateInterval(TimeValue.timeValueDays(Randomness.get().nextInt(29) + 1)); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + UpdateDatasourceRequest copiedRequest = new UpdateDatasourceRequest(input); + + // Verify + assertEquals(request, copiedRequest); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java new file mode 100644 index 00000000..5364a147 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/UpdateDatasourceTransportActionTests.java @@ -0,0 +1,231 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.security.InvalidParameterException; +import java.util.List; + +import lombok.SneakyThrows; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.tasks.Task; + +public class UpdateDatasourceTransportActionTests extends Ip2GeoTestCase { + private UpdateDatasourceTransportAction action; + + @Before + public void init() { + action = new UpdateDatasourceTransportAction( + transportService, + actionFilters, + ip2GeoLockService, + datasourceFacade, + datasourceUpdateService + ); + } + + public void testDoExecute_whenFailedToAcquireLock_thenError() { + validateDoExecuteWithLockError(null); + } + + public void testDoExecute_whenExceptionToAcquireLock_thenError() { + validateDoExecuteWithLockError(new RuntimeException()); + } + + private void validateDoExecuteWithLockError(final Exception exception) { + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + if (exception == null) { + // Run + captor.getValue().onResponse(null); + // Verify + verify(listener).onFailure(any(OpenSearchException.class)); + } else { + // Run + captor.getValue().onFailure(exception); + // Verify + verify(listener).onFailure(exception); + } + } + + @SneakyThrows + public void testDoExecute_whenValidInput_thenUpdate() { + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval() + 1)); + request.setEndpoint(sampleManifestUrl()); + + Task task = mock(Task.class); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + when(datasourceUpdateService.getHeaderFields(request.getEndpoint())).thenReturn(datasource.getDatabase().getFields()); + ActionListener listener = mock(ActionListener.class); + LockModel lockModel = randomLockModel(); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + verify(datasourceFacade).getDatasource(datasource.getName()); + verify(datasourceFacade).updateDatasource(datasource); + verify(datasourceUpdateService).getHeaderFields(request.getEndpoint()); + assertEquals(request.getEndpoint(), datasource.getEndpoint()); + assertEquals(request.getUpdateInterval().days(), datasource.getSchedule().getInterval()); + verify(listener).onResponse(new AcknowledgedResponse(true)); + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } + + @SneakyThrows + public void testDoExecute_whenNoChangesInValues_thenNoUpdate() { + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + request.setUpdateInterval(TimeValue.timeValueDays(datasource.getSchedule().getInterval())); + request.setEndpoint(datasource.getEndpoint()); + + Task task = mock(Task.class); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + ActionListener listener = mock(ActionListener.class); + LockModel lockModel = randomLockModel(); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + verify(datasourceFacade).getDatasource(datasource.getName()); + verify(datasourceUpdateService, never()).getHeaderFields(anyString()); + verify(datasourceFacade, never()).updateDatasource(datasource); + verify(listener).onResponse(new AcknowledgedResponse(true)); + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } + + @SneakyThrows + public void testDoExecute_whenNoDatasource_thenError() { + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + + Task task = mock(Task.class); + ActionListener listener = mock(ActionListener.class); + LockModel lockModel = randomLockModel(); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(exceptionCaptor.capture()); + assertEquals(ResourceNotFoundException.class, exceptionCaptor.getValue().getClass()); + exceptionCaptor.getValue().getMessage().contains("no such datasource exist"); + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } + + @SneakyThrows + public void testDoExecute_whenIncompatibleFields_thenError() { + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + request.setEndpoint(sampleManifestUrl()); + + Task task = mock(Task.class); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + List newFields = datasource.getDatabase().getFields().subList(0, 0); + when(datasourceUpdateService.getHeaderFields(request.getEndpoint())).thenReturn(newFields); + ActionListener listener = mock(ActionListener.class); + LockModel lockModel = randomLockModel(); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(exceptionCaptor.capture()); + assertEquals(OpenSearchException.class, exceptionCaptor.getValue().getClass()); + exceptionCaptor.getValue().getMessage().contains("does not contain"); + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } + + @SneakyThrows + public void testDoExecute_whenInvalidUpdateInterval_thenError() { + Datasource datasource = randomDatasource(); + UpdateDatasourceRequest request = new UpdateDatasourceRequest(datasource.getName()); + request.setUpdateInterval(TimeValue.timeValueDays(datasource.getDatabase().getValidForInDays())); + + Task task = mock(Task.class); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + ActionListener listener = mock(ActionListener.class); + LockModel lockModel = randomLockModel(); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + // Run + captor.getValue().onResponse(lockModel); + + // Verify + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener).onFailure(exceptionCaptor.capture()); + assertEquals(InvalidParameterException.class, exceptionCaptor.getValue().getClass()); + exceptionCaptor.getValue().getMessage().contains("should be smaller"); + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index 6e51c926..108029fe 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -19,6 +19,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.Iterator; +import java.util.List; import lombok.SneakyThrows; @@ -139,6 +140,17 @@ public void testUpdateOrCreateGeoIpData_whenValidInput_thenSucceed() { ); } + @SneakyThrows + public void testGetHeaderFields_whenValidInput_thenReturnCorrectValue() { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + + File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); + when(geoIpDataFacade.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + + // Run + assertEquals(Arrays.asList("country_name"), datasourceUpdateService.getHeaderFields(manifestFile.toURI().toURL().toExternalForm())); + } + @SneakyThrows public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { String datasourceName = GeospatialTestHelper.randomLowerCaseString(); @@ -165,4 +177,18 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { assertEquals(currentIndex, datasource.getIndices().get(0)); verify(datasourceFacade).updateDatasource(datasource); } + + @SneakyThrows + public void testGetHeaderFields_whenValidInput_thenSucceed() { + File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile()); + File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); + when(geoIpDataFacade.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180)); + + // Run + List fields = datasourceUpdateService.getHeaderFields(manifestFile.toURI().toURL().toExternalForm()); + + // Verify + List expectedFields = Arrays.asList("country_name"); + assertEquals(expectedFields, fields); + } } diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index cc31f5a7..ffd70090 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -34,6 +34,7 @@ import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; +import org.opensearch.geospatial.ip2geo.action.RestUpdateDatasourceHandler; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; @@ -62,7 +63,8 @@ public class GeospatialPluginTests extends OpenSearchTestCase { new RestUploadGeoJSONAction(), new RestUploadStatsAction(), new RestPutDatasourceHandler(clusterSettings), - new RestGetDatasourceHandler() + new RestGetDatasourceHandler(), + new RestUpdateDatasourceHandler() ); private final Set SUPPORTED_SYSTEM_INDEX_PATTERN = Set.of(IP2GEO_DATA_INDEX_NAME_PREFIX);