From 82c4c980b588f29514298350598400f2e0134be6 Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 3 May 2023 11:36:14 -0700 Subject: [PATCH] Implement get datasource api Signed-off-by: Heemin Kim --- .../ip2geo/action/GetDatasourceAction.java | 29 +++++ .../ip2geo/action/GetDatasourceRequest.java | 69 +++++++++++ .../ip2geo/action/GetDatasourceResponse.java | 92 ++++++++++++++ .../action/GetDatasourceTransportAction.java | 76 ++++++++++++ .../action/RestGetDatasourceHandler.java | 48 ++++++++ .../action/RestPutDatasourceHandler.java | 6 +- .../ip2geo/common/DatasourceFacade.java | 74 ++++++++++++ .../ip2geo/jobscheduler/Datasource.java | 112 +++++++++++++++--- .../geospatial/plugin/GeospatialPlugin.java | 13 +- .../geospatial/ip2geo/Ip2GeoTestCase.java | 40 +++++++ .../action/GetDatasourceRequestTests.java | 46 +++++++ .../action/GetDatasourceResponseTests.java | 99 ++++++++++++++++ .../GetDatasourceTransportActionTests.java | 88 ++++++++++++++ .../action/RestGetDatasourceHandlerTests.java | 81 +++++++++++++ 14 files changed, 849 insertions(+), 24 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java new file mode 100644 index 00000000..09f90934 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.action.ActionType; + +/** + * Ip2Geo datasource get action + */ +public class GetDatasourceAction extends ActionType { + /** + * Get datasource action instance + */ + public static final GetDatasourceAction INSTANCE = new GetDatasourceAction(); + /** + * Name of a get datasource action + */ + public static final String NAME = "cluster:admin/geospatial/datasource/get"; + + private GetDatasourceAction() { + super(NAME, GetDatasourceResponse::new); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java new file mode 100644 index 00000000..c686d786 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequest.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +/** + * Ip2Geo datasource get request + */ +@Getter +@Setter +@Log4j2 +@EqualsAndHashCode +public class GetDatasourceRequest extends ActionRequest { + /** + * @param names the datasource names + * @return the datasource names + */ + private String[] names; + + /** + * Constructs a new get datasource request with a list of datasources. + * + * If the list of datasources is empty or it contains a single element "_all", all registered datasources + * are returned. + * + * @param names list of datasource names + */ + public GetDatasourceRequest(final String[] names) { + this.names = names; + } + + /** + * Constructor with stream input + * @param in the stream input + * @throws IOException IOException + */ + public GetDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.names = in.readStringArray(); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(names); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java new file mode 100644 index 00000000..bcab0912 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponse.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; +import java.time.Instant; +import java.util.List; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.log4j.Log4j2; + +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; + +/** + * Ip2Geo datasource get request + */ +@Getter +@Setter +@Log4j2 +@EqualsAndHashCode +public class GetDatasourceResponse extends ActionResponse implements ToXContentObject { + private static final String FIELD_NAME_DATASOURCES = "datasources"; + private static final String FIELD_NAME_NAME = "name"; + private static final String FIELD_NAME_STATE = "state"; + private static final String FIELD_NAME_ENDPOINT = "endpoint"; + private static final String FIELD_NAME_UPDATE_INTERVAL = "update_interval_in_days"; + private static final String FIELD_NAME_NEXT_UPDATE_AT = "next_update_at_in_epoch_millis"; + private static final String FIELD_NAME_NEXT_UPDATE_AT_READABLE = "next_update_at"; + private static final String FIELD_NAME_DATABASE = "database"; + private static final String FIELD_NAME_UPDATE_STATS = "update_stats"; + private List datasources; + + /** + * Default constructor + * + * @param datasources List of datasources + */ + public GetDatasourceResponse(final List datasources) { + this.datasources = datasources; + } + + /** + * Constructor with StreamInput + * + * @param in the stream input + */ + public GetDatasourceResponse(final StreamInput in) throws IOException { + datasources = in.readList(Datasource::new); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeList(datasources); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + builder.startArray(FIELD_NAME_DATASOURCES); + for (Datasource datasource : datasources) { + builder.startObject(); + builder.field(FIELD_NAME_NAME, datasource.getName()); + builder.field(FIELD_NAME_STATE, datasource.getState()); + builder.field(FIELD_NAME_ENDPOINT, datasource.getEndpoint()); + builder.field(FIELD_NAME_UPDATE_INTERVAL, datasource.getSchedule().getInterval()); + builder.timeField( + FIELD_NAME_NEXT_UPDATE_AT, + FIELD_NAME_NEXT_UPDATE_AT_READABLE, + datasource.getSchedule().getNextExecutionTime(Instant.now()).toEpochMilli() + ); + builder.field(FIELD_NAME_DATABASE, datasource.getDatabase()); + builder.field(FIELD_NAME_UPDATE_STATS, datasource.getUpdateStats()); + builder.endObject(); + } + builder.endArray(); + builder.endObject(); + return builder; + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java new file mode 100644 index 00000000..b7c030f3 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportAction.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.List; + +import lombok.extern.log4j.Log4j2; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * Transport action to get datasource + */ +@Log4j2 +public class GetDatasourceTransportAction extends HandledTransportAction { + private final DatasourceFacade datasourceFacade; + + /** + * Default constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param datasourceFacade the datasource facade + */ + @Inject + public GetDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final DatasourceFacade datasourceFacade + ) { + super(GetDatasourceAction.NAME, transportService, actionFilters, GetDatasourceRequest::new); + this.datasourceFacade = datasourceFacade; + } + + @Override + protected void doExecute(final Task task, final GetDatasourceRequest request, final ActionListener listener) { + if (request.getNames().length == 0 || (request.getNames().length == 1 && "_all".equals(request.getNames()[0]))) { + // We don't expect too many data sources. Therefore, querying all data sources without pagination should be fine. + datasourceFacade.getAllDatasources(new ActionListener<>() { + @Override + public void onResponse(final List datasources) { + listener.onResponse(new GetDatasourceResponse(datasources)); + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + }); + } else { + datasourceFacade.getDatasources(request.getNames(), new ActionListener<>() { + @Override + public void onResponse(final List datasources) { + listener.onResponse(new GetDatasourceResponse(datasources)); + } + + @Override + public void onFailure(final Exception e) { + listener.onFailure(e); + } + }); + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java new file mode 100644 index 00000000..883e5c96 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandler.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; +import static org.opensearch.rest.RestRequest.Method.GET; + +import java.util.List; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +/** + * Rest handler for Ip2Geo datasource get request + */ +public class RestGetDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "ip2geo_datasource_get"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final String[] names = request.paramAsStringArray("name", Strings.EMPTY_ARRAY); + final GetDatasourceRequest getDatasourceRequest = new GetDatasourceRequest(names); + + return channel -> client.executeLocally(GetDatasourceAction.INSTANCE, getDatasourceRequest, new RestToXContentListener<>(channel)); + } + + @Override + public List routes() { + String path1 = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource"); + String path2 = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}"); + return List.of(new Route(GET, path1), new Route(GET, path2)); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java index 3ccffa06..d35b7751 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceHandler.java @@ -39,7 +39,7 @@ * */ public class RestPutDatasourceHandler extends BaseRestHandler { - private static final String ACTION_NAME = "ip2geo_datasource"; + private static final String ACTION_NAME = "ip2geo_datasource_put"; private final ClusterSettings clusterSettings; public RestPutDatasourceHandler(final ClusterSettings clusterSettings) { @@ -53,7 +53,7 @@ public String getName() { @Override protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { - final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("id")); + final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("name")); if (request.hasContentOrSourceParam()) { try (XContentParser parser = request.contentOrSourceParamParser()) { PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null); @@ -70,7 +70,7 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No @Override public List routes() { - String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{id}"); + String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{name}"); return List.of(new Route(PUT, path)); } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index b16e5478..e65a0566 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -10,6 +10,8 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; import lombok.extern.log4j.Log4j2; @@ -17,8 +19,11 @@ import org.opensearch.action.DocWriteRequest; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; +import org.opensearch.action.get.MultiGetItemResponse; +import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexRequestBuilder; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.search.SearchResponse; import org.opensearch.client.Client; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.xcontent.LoggingDeprecationHandler; @@ -30,6 +35,8 @@ import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; /** * Facade class for datasource @@ -120,4 +127,71 @@ public void onFailure(final Exception e) { } }); } + + /** + * Get datasources from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param names the array of datasource names + * @param actionListener the action listener + */ + public void getDatasources(final String[] names, final ActionListener> actionListener) { + client.prepareMultiGet().add(DatasourceExtension.JOB_INDEX_NAME, names).execute(new ActionListener<>() { + @Override + public void onResponse(final MultiGetResponse response) { + List datasources = new ArrayList<>(response.getResponses().length); + for (MultiGetItemResponse item : response.getResponses()) { + if (item.getResponse() != null && item.getResponse().isExists()) { + try { + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + item.getResponse().getSourceAsBytesRef() + ); + datasources.add(Datasource.PARSER.parse(parser, null)); + } catch (IOException e) { + actionListener.onFailure(e); + return; + } + } + } + actionListener.onResponse(datasources); + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }); + } + + /** + * Get all datasources from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param actionListener the action listener + */ + public void getAllDatasources(final ActionListener> actionListener) { + client.prepareSearch(DatasourceExtension.JOB_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).execute(new ActionListener<>() { + @Override + public void onResponse(final SearchResponse searchResponse) { + List datasources = new ArrayList<>(searchResponse.getHits().getHits().length); + for (SearchHit searchHit : searchResponse.getHits().getHits()) { + try { + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + searchHit.getSourceRef() + ); + datasources.add(Datasource.PARSER.parse(parser, null)); + } catch (IOException e) { + actionListener.onFailure(e); + return; + } + } + actionListener.onResponse(datasources); + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }); + } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index dfc26d71..dbaf118c 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -25,6 +25,9 @@ import lombok.Setter; import lombok.ToString; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; import org.opensearch.core.ParseField; import org.opensearch.core.xcontent.ConstructingObjectParser; import org.opensearch.core.xcontent.ToXContent; @@ -45,7 +48,7 @@ @ToString @EqualsAndHashCode @AllArgsConstructor -public class Datasource implements ScheduledJobParameter { +public class Datasource implements Writeable, ScheduledJobParameter { /** * Prefix of indices having Ip2Geo data */ @@ -58,13 +61,13 @@ public class Datasource implements ScheduledJobParameter { /** * Default fields for job scheduling */ - private static final ParseField ID_FIELD = new ParseField("id"); + private static final ParseField NAME_FIELD = new ParseField("name"); private static final ParseField ENABLED_FILED = new ParseField("update_enabled"); - private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time"); - private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field"); + private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time_in_epoch_millis"); + private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time"); private static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); - private static final ParseField ENABLED_TIME_FILED = new ParseField("enabled_time"); - private static final ParseField ENABLED_TIME_FILED_READABLE = new ParseField("enabled_time_field"); + private static final ParseField ENABLED_TIME_FILED = new ParseField("enabled_time_in_epoch_millis"); + private static final ParseField ENABLED_TIME_FILED_READABLE = new ParseField("enabled_time"); /** * Additional fields for datasource @@ -81,7 +84,7 @@ public class Datasource implements ScheduledJobParameter { /** * @param id Id of a datasource - * @return Id of a datasource + * @return id of a datasource */ private String id; /** @@ -169,7 +172,7 @@ public class Datasource implements ScheduledJobParameter { } ); static { - PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), NAME_FIELD); PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_UPDATE_TIME_FIELD); PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ENABLED_TIME_FILED); PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FILED); @@ -202,10 +205,37 @@ public Datasource(final String id, final IntervalSchedule schedule, final String ); } + public Datasource(final StreamInput in) throws IOException { + id = in.readString(); + lastUpdateTime = toInstant(in.readLong()); + enabledTime = toInstant(in.readOptionalLong()); + isEnabled = in.readBoolean(); + schedule = new IntervalSchedule(in); + endpoint = in.readString(); + state = DatasourceState.valueOf(in.readString()); + indices = in.readStringList(); + database = new Database(in); + updateStats = new UpdateStats(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeString(id); + out.writeLong(lastUpdateTime.toEpochMilli()); + out.writeOptionalLong(enabledTime == null ? null : enabledTime.toEpochMilli()); + out.writeBoolean(isEnabled); + schedule.writeTo(out); + out.writeString(endpoint); + out.writeString(state.name()); + out.writeStringCollection(indices); + database.writeTo(out); + updateStats.writeTo(out); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); - builder.field(ID_FIELD.getPreferredName(), id); + builder.field(NAME_FIELD.getPreferredName(), id); builder.timeField( LAST_UPDATE_TIME_FIELD.getPreferredName(), LAST_UPDATE_TIME_FIELD_READABLE.getPreferredName(), @@ -378,6 +408,18 @@ public boolean isCompatible(final List fields) { return true; } + /** + * Convert long to instant + * + * This method is static so that it can be used in child class + * + * @param epochMilli the epoch milliseconds + * @return the instant + */ + private static Instant toInstant(final Long epochMilli) { + return epochMilli == null ? null : Instant.ofEpochMilli(epochMilli); + } + /** * Database of a datasource */ @@ -387,11 +429,11 @@ public boolean isCompatible(final List fields) { @EqualsAndHashCode @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) - public static class Database implements ToXContent { + public static class Database implements Writeable, ToXContent { private static final ParseField PROVIDER_FIELD = new ParseField("provider"); private static final ParseField MD5_HASH_FIELD = new ParseField("md5_hash"); - private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at"); - private static final ParseField UPDATED_AT_FIELD_READABLE = new ParseField("updated_at_field"); + private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at_in_epoch_millis"); + private static final ParseField UPDATED_AT_FIELD_READABLE = new ParseField("updated_at"); private static final ParseField FIELDS_FIELD = new ParseField("fields"); private static final ParseField VALID_FOR_IN_DAYS_FIELD = new ParseField("valid_for_in_days"); @@ -441,6 +483,23 @@ public static class Database implements ToXContent { PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), FIELDS_FIELD); } + public Database(final StreamInput in) throws IOException { + provider = in.readOptionalString(); + md5Hash = in.readOptionalString(); + updatedAt = toInstant(in.readOptionalLong()); + validForInDays = in.readOptionalLong(); + fields = in.readOptionalStringList(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalString(provider); + out.writeOptionalString(md5Hash); + out.writeOptionalLong(updatedAt.toEpochMilli()); + out.writeOptionalLong(validForInDays); + out.writeOptionalStringCollection(fields); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); @@ -481,14 +540,14 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa @EqualsAndHashCode @AllArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE) - public static class UpdateStats implements ToXContent { - private static final ParseField LAST_SUCCEEDED_AT_FIELD = new ParseField("last_succeeded_at"); - private static final ParseField LAST_SUCCEEDED_AT_FIELD_READABLE = new ParseField("last_succeeded_at_field"); + public static class UpdateStats implements Writeable, ToXContent { + private static final ParseField LAST_SUCCEEDED_AT_FIELD = new ParseField("last_succeeded_at_in_epoch_millis"); + private static final ParseField LAST_SUCCEEDED_AT_FIELD_READABLE = new ParseField("last_succeeded_at"); private static final ParseField LAST_PROCESSING_TIME_IN_MILLIS_FIELD = new ParseField("last_processing_time_in_millis"); - private static final ParseField LAST_FAILED_AT_FIELD = new ParseField("last_failed_at"); - private static final ParseField LAST_FAILED_AT_FIELD_READABLE = new ParseField("last_failed_at_field"); - private static final ParseField LAST_SKIPPED_AT = new ParseField("last_skipped_at"); - private static final ParseField LAST_SKIPPED_AT_READABLE = new ParseField("last_skipped_at_field"); + private static final ParseField LAST_FAILED_AT_FIELD = new ParseField("last_failed_at_in_epoch_millis"); + private static final ParseField LAST_FAILED_AT_FIELD_READABLE = new ParseField("last_failed_at"); + private static final ParseField LAST_SKIPPED_AT = new ParseField("last_skipped_at_in_epoch_millis"); + private static final ParseField LAST_SKIPPED_AT_READABLE = new ParseField("last_skipped_at"); /** * @param lastSucceededAt The last time when GeoIP data update was succeeded @@ -530,6 +589,21 @@ public static class UpdateStats implements ToXContent { PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_SKIPPED_AT); } + public UpdateStats(final StreamInput in) throws IOException { + lastSucceededAt = toInstant(in.readOptionalLong()); + lastProcessingTimeInMillis = in.readOptionalLong(); + lastFailedAt = toInstant(in.readOptionalLong()); + lastSkippedAt = toInstant(in.readOptionalLong()); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + out.writeOptionalLong(lastSucceededAt.toEpochMilli()); + out.writeOptionalLong(lastProcessingTimeInMillis); + out.writeOptionalLong(lastFailedAt.toEpochMilli()); + out.writeOptionalLong(lastSkippedAt.toEpochMilli()); + } + @Override public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { builder.startObject(); diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 66752fc5..fc29b165 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -36,8 +36,11 @@ import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper; import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldTypeParser; import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder; +import org.opensearch.geospatial.ip2geo.action.GetDatasourceAction; +import org.opensearch.geospatial.ip2geo.action.GetDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceTransportAction; +import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; @@ -153,7 +156,12 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - return List.of(new RestUploadStatsAction(), new RestUploadGeoJSONAction(), new RestPutDatasourceHandler(clusterSettings)); + return List.of( + new RestUploadStatsAction(), + new RestUploadGeoJSONAction(), + new RestPutDatasourceHandler(clusterSettings), + new RestGetDatasourceHandler() + ); } @Override @@ -161,7 +169,8 @@ public List getRestHandlers( return List.of( new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class), new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class), - new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class) + new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class), + new ActionHandler<>(GetDatasourceAction.INSTANCE, GetDatasourceTransportAction.class) ); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index c1aec21e..4ba50795 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -13,6 +13,8 @@ import java.io.File; import java.nio.file.Paths; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.Arrays; import java.util.HashSet; import java.util.Locale; @@ -39,13 +41,16 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.OpenSearchExecutors; +import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; import org.opensearch.ingest.IngestService; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskListener; @@ -120,6 +125,13 @@ public DatasourceState randomStateExcept(DatasourceState state) { .get(Randomness.createSecure().nextInt(DatasourceState.values().length - 2)); } + public DatasourceState randomState() { + return Arrays.stream(DatasourceState.values()) + .sequential() + .collect(Collectors.toList()) + .get(Randomness.createSecure().nextInt(DatasourceState.values().length - 1)); + } + public String randomIpAddress() { return String.format( Locale.ROOT, @@ -149,6 +161,34 @@ public File sampleIp2GeoFile() { return new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile()); } + public Datasource randomDatasource() { + Datasource datasource = new Datasource(); + datasource.setId(GeospatialTestHelper.randomLowerCaseString()); + datasource.setSchedule( + new IntervalSchedule(Instant.now().truncatedTo(ChronoUnit.MILLIS), Randomness.get().nextInt(30) + 1, ChronoUnit.DAYS) + ); + datasource.setState(randomState()); + datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); + datasource.setEndpoint(GeospatialTestHelper.randomLowerCaseString()); + datasource.getDatabase() + .setFields(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); + datasource.getDatabase().setProvider(GeospatialTestHelper.randomLowerCaseString()); + datasource.getDatabase().setUpdatedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getDatabase().setMd5Hash(GeospatialTestHelper.randomLowerCaseString()); + datasource.getDatabase().setValidForInDays(Randomness.get().nextInt(30) + 1l); + datasource.getUpdateStats().setLastSkippedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getUpdateStats().setLastSucceededAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getUpdateStats().setLastFailedAt(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + datasource.getUpdateStats().setLastProcessingTimeInMillis(Randomness.get().nextLong()); + datasource.setLastUpdateTime(Instant.now().truncatedTo(ChronoUnit.MILLIS)); + if (Randomness.get().nextInt() % 2 == 0) { + datasource.enable(); + } else { + datasource.disable(); + } + return datasource; + } + /** * Temporary class of VerifyingClient until this PR(https://github.com/opensearch-project/OpenSearch/pull/7167) * is merged in OpenSearch core diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java new file mode 100644 index 00000000..4509e85c --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceRequestTests.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class GetDatasourceRequestTests extends Ip2GeoTestCase { + public void testStreamInOutWithEmptyNames() throws Exception { + String[] empty = new String[0]; + GetDatasourceRequest request = new GetDatasourceRequest(empty); + assertNull(request.validate()); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + GetDatasourceRequest copiedRequest = new GetDatasourceRequest(input); + + // Verify + assertEquals(request, copiedRequest); + } + + public void testStreamInOut() throws Exception { + String[] empty = { GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString() }; + GetDatasourceRequest request = new GetDatasourceRequest(empty); + assertNull(request.validate()); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + GetDatasourceRequest copiedRequest = new GetDatasourceRequest(input); + + // Verify + assertEquals(request, copiedRequest); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java new file mode 100644 index 00000000..2e2d5799 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceResponseTests.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import org.opensearch.common.Strings; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; + +public class GetDatasourceResponseTests extends Ip2GeoTestCase { + + public void testStreamInOut() throws Exception { + List datasourceList = Arrays.asList(randomDatasource(), randomDatasource()); + GetDatasourceResponse response = new GetDatasourceResponse(datasourceList); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + response.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + GetDatasourceResponse copiedResponse = new GetDatasourceResponse(input); + + // Verify + assertArrayEquals(response.getDatasources().toArray(), copiedResponse.getDatasources().toArray()); + } + + public void testXContent() throws Exception { + List datasourceList = Arrays.asList(randomDatasource(), randomDatasource()); + GetDatasourceResponse response = new GetDatasourceResponse(datasourceList); + String json = Strings.toString(response.toXContent(JsonXContent.contentBuilder(), null)); + for (Datasource datasource : datasourceList) { + assertTrue(json.contains(String.format(Locale.ROOT, "\"name\":\"%s\"", datasource.getName()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"state\":\"%s\"", datasource.getState()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"endpoint\":\"%s\"", datasource.getEndpoint()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"update_interval_in_days\":%d", datasource.getSchedule().getInterval()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"next_update_at_in_epoch_millis\""))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"provider\":\"%s\"", datasource.getDatabase().getProvider()))); + assertTrue(json.contains(String.format(Locale.ROOT, "\"md5_hash\":\"%s\"", datasource.getDatabase().getMd5Hash()))); + assertTrue( + json.contains( + String.format(Locale.ROOT, "\"updated_at_in_epoch_millis\":%d", datasource.getDatabase().getUpdatedAt().toEpochMilli()) + ) + ); + assertTrue(json.contains(String.format(Locale.ROOT, "\"valid_for_in_days\":%d", datasource.getDatabase().getValidForInDays()))); + for (String field : datasource.getDatabase().getFields()) { + assertTrue(json.contains(field)); + } + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_succeeded_at_in_epoch_millis\":%d", + datasource.getUpdateStats().getLastSucceededAt().toEpochMilli() + ) + ) + ); + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_processing_time_in_millis\":%d", + datasource.getUpdateStats().getLastProcessingTimeInMillis() + ) + ) + ); + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_failed_at_in_epoch_millis\":%d", + datasource.getUpdateStats().getLastFailedAt().toEpochMilli() + ) + ) + ); + assertTrue( + json.contains( + String.format( + Locale.ROOT, + "\"last_skipped_at_in_epoch_millis\":%d", + datasource.getUpdateStats().getLastSkippedAt().toEpochMilli() + ) + ) + ); + + } + } + +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java new file mode 100644 index 00000000..1f7042f5 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceTransportActionTests.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.List; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.action.ActionListener; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.tasks.Task; + +public class GetDatasourceTransportActionTests extends Ip2GeoTestCase { + private GetDatasourceTransportAction action; + + @Before + public void init() { + action = new GetDatasourceTransportAction(transportService, actionFilters, datasourceFacade); + } + + public void testDoExecuteWithGetAll() throws Exception { + Task task = mock(Task.class); + GetDatasourceRequest request = new GetDatasourceRequest(new String[] { "_all" }); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getAllDatasources(captor.capture()); + + // Run + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + captor.getValue().onResponse(datasources); + + // Verify + verify(listener).onResponse(new GetDatasourceResponse(datasources)); + + // Run + RuntimeException exception = new RuntimeException(); + captor.getValue().onFailure(exception); + + // Verify + verify(listener).onFailure(exception); + } + + public void testDoExecuteWithNames() { + Task task = mock(Task.class); + List datasources = Arrays.asList(randomDatasource(), randomDatasource()); + String[] datasourceNames = datasources.stream().map(Datasource::getName).toArray(String[]::new); + + GetDatasourceRequest request = new GetDatasourceRequest(datasourceNames); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor>> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(datasourceFacade).getDatasources(eq(datasourceNames), captor.capture()); + + // Run + captor.getValue().onResponse(datasources); + + // Verify + verify(listener).onResponse(new GetDatasourceResponse(datasources)); + + // Run + RuntimeException exception = new RuntimeException(); + captor.getValue().onFailure(exception); + + // Verify + verify(listener).onFailure(exception); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java new file mode 100644 index 00000000..523def50 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestGetDatasourceHandlerTests.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; + +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.StringUtils; +import org.junit.Before; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +public class RestGetDatasourceHandlerTests extends RestActionTestCase { + private String PATH_FOR_ALL = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource"); + private String path; + private RestGetDatasourceHandler action; + + @Before + public void setupAction() { + action = new RestGetDatasourceHandler(); + controller().registerHandler(action); + path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/%s"); + } + + public void testPrepareRequest() { + String dsName1 = GeospatialTestHelper.randomLowerCaseString(); + String dsName2 = GeospatialTestHelper.randomLowerCaseString(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath(String.format(Locale.ROOT, path, StringUtils.joinWith(",", dsName1, dsName2))) + .build(); + + AtomicBoolean isExecuted = new AtomicBoolean(false); + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + // Verifying + assertTrue(actionRequest instanceof GetDatasourceRequest); + GetDatasourceRequest getDatasourceRequest = (GetDatasourceRequest) actionRequest; + assertArrayEquals(new String[] { dsName1, dsName2 }, getDatasourceRequest.getNames()); + isExecuted.set(true); + return null; + }); + + // Run + dispatchRequest(request); + + // Verify + assertTrue(isExecuted.get()); + } + + public void testPrepareRequestForAll() { + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.GET) + .withPath(PATH_FOR_ALL) + .build(); + + AtomicBoolean isExecuted = new AtomicBoolean(false); + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + // Verifying + assertTrue(actionRequest instanceof GetDatasourceRequest); + GetDatasourceRequest getDatasourceRequest = (GetDatasourceRequest) actionRequest; + assertArrayEquals(new String[] {}, getDatasourceRequest.getNames()); + isExecuted.set(true); + return null; + }); + + // Run + dispatchRequest(request); + + // Verify + assertTrue(isExecuted.get()); + } +}