diff --git a/build.gradle b/build.gradle index 1ae2da18..643ecf65 100644 --- a/build.gradle +++ b/build.gradle @@ -5,6 +5,8 @@ import org.opensearch.gradle.test.RestIntegTestTask +import java.util.concurrent.Callable + apply plugin: 'java' apply plugin: 'idea' apply plugin: 'opensearch.opensearchplugin' @@ -35,6 +37,7 @@ opensearchplugin { classname "${projectPath}.${pathToPlugin}.${pluginClassName}" licenseFile rootProject.file('LICENSE') noticeFile rootProject.file('NOTICE') + extendedPlugins = ['opensearch-job-scheduler'] } // This requires an additional Jar not published as part of build-tools @@ -142,6 +145,10 @@ publishing { } +configurations { + zipArchive +} + //****************************************************************************/ // Dependencies //****************************************************************************/ @@ -154,6 +161,11 @@ dependencies { implementation "org.apache.commons:commons-lang3:3.12.0" implementation "org.locationtech.spatial4j:spatial4j:${versions.spatial4j}" implementation "org.locationtech.jts:jts-core:${versions.jts}" + api("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}") + api("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}") + implementation "org.apache.commons:commons-csv:1.10.0" + zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}" + compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}" } licenseHeaders.enabled = true @@ -206,8 +218,6 @@ integTest { testClusters.integTest { testDistribution = "ARCHIVE" - // This installs our plugin into the testClusters - plugin(project.tasks.bundlePlugin.archiveFile) // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 if (_numNodes > 1) numberOfNodes = _numNodes // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore @@ -220,6 +230,49 @@ testClusters.integTest { debugPort += 1 } } + + // This installs our plugin into the testClusters + plugin(project.tasks.bundlePlugin.archiveFile) + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + })) + + // opensearch-geospatial plugin is being added to the list of plugins for the testCluster during build before + // the opensearch-job-scheduler plugin, which is causing build failures. From the stack trace, this looks like a bug. + // + // Exception in thread "main" java.lang.IllegalArgumentException: Missing plugin [opensearch-job-scheduler], dependency of [opensearch-geospatial] + // at org.opensearch.plugins.PluginsService.addSortedBundle(PluginsService.java:515) + // + // A temporary hack is to reorder the plugins list after evaluation but prior to task execution when the plugins are installed. + // See https://github.com/opensearch-project/anomaly-detection/blob/fd547014fdde5114bbc9c8e49fe7aaa37eb6e793/build.gradle#L400-L422 + nodes.each { node -> + def plugins = node.plugins + def firstPlugin = plugins.get(0) + plugins.remove(0) + plugins.add(firstPlugin) + } +} + +testClusters.yamlRestTest { + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + })) } run { diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceAction.java new file mode 100644 index 00000000..3e7b1ea8 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceAction.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * GeoIp datasource creation action + */ +public class PutDatasourceAction extends ActionType { + /** + * Put datasource action instance + */ + public static final PutDatasourceAction INSTANCE = new PutDatasourceAction(); + /** + * Name of a put datasource action + */ + public static final String NAME = "cluster:admin/geospatial/datasource"; + + private PutDatasourceAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java new file mode 100644 index 00000000..f1a12169 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequest.java @@ -0,0 +1,141 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Locale; + +import lombok.Getter; +import lombok.Setter; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.master.AcknowledgedRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ObjectParser; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; + +/** + * GeoIP datasource creation request + */ +@Getter +@Setter +public class PutDatasourceRequest extends AcknowledgedRequest { + private static final Logger LOGGER = LogManager.getLogger(PutDatasourceRequest.class); + private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days"); + /** + * @param id Datasource name + * @return Datasource name + */ + private String id; + /** + * @param endpoint URL to a manifest file for a datasource + * @return URL to a manifest file for a datasource + */ + private String endpoint; + /** + * @param updateIntervalInDays Update interval of a datasource + * @return Update interval of a datasource + */ + private TimeValue updateIntervalInDays; + + /** + * Parser of a datasource + */ + public static final ObjectParser PARSER; + static { + PARSER = new ObjectParser<>("put_datasource"); + PARSER.declareString((request, val) -> request.setEndpoint(val), ENDPOINT_FIELD); + PARSER.declareLong((request, val) -> request.setUpdateIntervalInDays(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD); + } + + /** + * Default constructor + * @param id name of a datasource + */ + public PutDatasourceRequest(final String id) { + this.id = id; + } + + /** + * Constructor with stream input + * @param in the stream input + * @throws IOException IOException + */ + public PutDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.id = in.readString(); + this.endpoint = in.readString(); + this.updateIntervalInDays = in.readTimeValue(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(id); + out.writeString(endpoint); + out.writeTimeValue(updateIntervalInDays); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = new ActionRequestValidationException(); + validateEndpoint(errors); + validateUpdateInterval(errors); + return errors.validationErrors().isEmpty() ? null : errors; + } + + private void validateEndpoint(final ActionRequestValidationException errors) { + try { + URL url = new URL(endpoint); + url.toURI(); // Validate URL complies with RFC-2396 + validateManifestFile(url, errors); + } catch (MalformedURLException | URISyntaxException e) { + LOGGER.info("Invalid URL format is provided", e); + errors.addValidationError("Invalid URL format is provided"); + } + } + + private void validateManifestFile(final URL url, final ActionRequestValidationException errors) { + try { + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + new URL(manifest.getUrl()).toURI(); // Validate URL complies with RFC-2396 + if (manifest.getValidForInDays() <= updateIntervalInDays.days()) { + errors.addValidationError( + String.format( + Locale.ROOT, + "updateInterval %d is should be smaller than %d", + updateIntervalInDays.days(), + manifest.getValidForInDays() + ) + ); + } + } catch (MalformedURLException | URISyntaxException e) { + LOGGER.info("Invalid URL format is provided for url field in the manifest file", e); + errors.addValidationError("Invalid URL format is provided for url field in the manifest file"); + } catch (Exception e) { + LOGGER.info("Error occurred while reading a file from {}", url, e); + errors.addValidationError(String.format(Locale.ROOT, "Error occurred while reading a file from %s", url)); + } + } + + private void validateUpdateInterval(final ActionRequestValidationException errors) { + if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) > 0) { + errors.addValidationError("Update interval should be equal to or larger than 1 day"); + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java new file mode 100644 index 00000000..439e3109 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceTransportAction.java @@ -0,0 +1,183 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; +import org.opensearch.index.engine.VersionConflictEngineException; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +/** + * Transport action to create datasource + */ +public class PutDatasourceTransportAction extends HandledTransportAction { + private static final Logger LOGGER = LogManager.getLogger(PutDatasourceTransportAction.class); + private final Client client; + private final ClusterService clusterService; + private final ThreadPool threadPool; + + private TimeValue timeout; + private int indexingBulkSize; + + /** + * Default constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param client the client + * @param clusterService the cluster service + * @param threadPool the thread pool + * @param settings the settings + * @param clusterSettings the cluster settings + */ + @Inject + public PutDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final Client client, + final ClusterService clusterService, + final ThreadPool threadPool, + final Settings settings, + final ClusterSettings clusterSettings + ) { + super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new); + this.client = client; + this.clusterService = clusterService; + this.threadPool = threadPool; + timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(settings); + clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, it -> timeout = it); + indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.INDEXING_BULK_SIZE, it -> indexingBulkSize = it); + } + + @Override + protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener listener) { + try { + Datasource jobParameter = Datasource.Builder.build(request); + IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME) + .id(jobParameter.getId()) + .source(jobParameter.toXContent(JsonXContent.contentBuilder(), null)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .opType(DocWriteRequest.OpType.CREATE); + client.index(indexRequest, new ActionListener<>() { + @Override + public void onResponse(final IndexResponse indexResponse) { + threadPool.generic().submit(() -> { + try { + createDatasource(jobParameter); + } catch (Exception e) { + LOGGER.error("Failed to create datasource for {}", jobParameter.getId(), e); + jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); + jobParameter.setState(DatasourceState.FAILED); + try { + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + } catch (Exception ex) { + LOGGER.error("Failed to mark datasource state as FAILED for {}", jobParameter.getId(), ex); + } + } + }); + listener.onResponse(new AcknowledgedResponse(true)); + } + + @Override + public void onFailure(final Exception e) { + if (e instanceof VersionConflictEngineException) { + LOGGER.info("Datasource already exists {}", request.getId(), e); + listener.onFailure(new ResourceAlreadyExistsException("Datasource already exists")); + } else { + LOGGER.error("Failed to create a datasource {}", request.getId(), e); + listener.onFailure(new OpenSearchException("Failed to create a datasource")); + } + } + }); + } catch (Exception e) { + LOGGER.error("Error occurred while creating datasource {}", request.getId(), e); + listener.onFailure(new OpenSearchException("Failed to create a datasource")); + } + } + + private void createDatasource(final Datasource jobParameter) throws Exception { + if (!DatasourceState.PREPARING.equals(jobParameter.getState())) { + LOGGER.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState()); + jobParameter.setState(DatasourceState.FAILED); + jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + return; + } + + URL url = new URL(jobParameter.getEndpoint()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + + Instant startTime = Instant.now(); + String indexName = jobParameter.indexNameFor(manifest); + jobParameter.getIndices().add(indexName); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + GeoIpDataHelper.createIndex(clusterService, client, indexName, timeout); + String[] fields; + try (CSVParser reader = GeoIpDataHelper.getDatabaseReader(manifest)) { + Iterator iter = reader.iterator(); + fields = iter.next().values(); + GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout); + } + + Instant endTime = Instant.now(); + jobParameter.setDatabase( + new Datasource.Database( + manifest.getProvider(), + manifest.getMd5Hash(), + Instant.ofEpochMilli(manifest.getUpdatedAt()), + manifest.getValidForInDays(), + Arrays.asList(fields) + ) + ); + jobParameter.getUpdateStats().setLastSucceededAt(endTime); + jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); + jobParameter.enable(); + jobParameter.setState(DatasourceState.AVAILABLE); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + LOGGER.info( + "GeoIP database creation succeeded for {} and took {} seconds", + jobParameter.getId(), + Duration.between(startTime, endTime) + ); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceAction.java new file mode 100644 index 00000000..05a6365c --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceAction.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; +import static org.opensearch.rest.RestRequest.Method.PUT; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +/** + * Rest handler for Ip2Geo datasource creation + */ +public class RestPutDatasourceAction extends BaseRestHandler { + private String defaultDatasourceEndpoint; + private TimeValue defaultUpdateInterval; + + public RestPutDatasourceAction(final Settings settings, final ClusterSettings clusterSettings) { + defaultDatasourceEndpoint = Ip2GeoSettings.DATASOURCE_ENDPOINT.get(settings); + clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.DATASOURCE_ENDPOINT, it -> defaultDatasourceEndpoint = it); + defaultUpdateInterval = Ip2GeoSettings.DATASOURCE_UPDATE_INTERVAL.get(settings); + clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.DATASOURCE_UPDATE_INTERVAL, it -> defaultUpdateInterval = it); + } + + @Override + public String getName() { + return "ip2geo_datasource"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException { + final PutDatasourceRequest putDatasourceRequest = new PutDatasourceRequest(request.param("id")); + if (request.hasContentOrSourceParam()) { + try (XContentParser parser = request.contentOrSourceParamParser()) { + PutDatasourceRequest.PARSER.parse(parser, putDatasourceRequest, null); + } + } + if (putDatasourceRequest.getEndpoint() == null) { + putDatasourceRequest.setEndpoint(defaultDatasourceEndpoint); + } + if (putDatasourceRequest.getUpdateIntervalInDays() == null) { + putDatasourceRequest.setUpdateIntervalInDays(defaultUpdateInterval); + } + return channel -> client.executeLocally(PutDatasourceAction.INSTANCE, putDatasourceRequest, new RestToXContentListener<>(channel)); + } + + @Override + public List routes() { + // TODO enable when feature implementation is completed + boolean enabled = false; + String path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/{id}"); + return enabled ? List.of(new Route(PUT, path)) : new ArrayList<>(); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java new file mode 100644 index 00000000..6527d164 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelper.java @@ -0,0 +1,121 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.io.IOException; +import java.time.Instant; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexRequestBuilder; +import org.opensearch.action.index.IndexResponse; +import org.opensearch.client.Client; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; +import org.opensearch.index.IndexNotFoundException; + +/** + * Helper class for datasource + */ +public class DatasourceHelper { + private static final Logger LOGGER = LogManager.getLogger(DatasourceHelper.class); + + /** + * Update datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param client the client + * @param datasource the datasource + * @param timeout the timeout + * @return index response + * @throws IOException exception + */ + public static IndexResponse updateDatasource(final Client client, final Datasource datasource, final TimeValue timeout) + throws IOException { + datasource.setLastUpdateTime(Instant.now()); + IndexRequestBuilder requestBuilder = client.prepareIndex(DatasourceExtension.JOB_INDEX_NAME); + requestBuilder.setId(datasource.getId()); + requestBuilder.setOpType(DocWriteRequest.OpType.INDEX); + requestBuilder.setSource(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)); + return client.index(requestBuilder.request()).actionGet(timeout); + } + + /** + * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param client the client + * @param id the name of a datasource + * @param timeout the timeout + * @return datasource + * @throws IOException exception + */ + public static Datasource getDatasource(final Client client, final String id, final TimeValue timeout) throws IOException { + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, id); + GetResponse response; + try { + response = client.get(request).actionGet(timeout); + if (!response.isExists()) { + LOGGER.error("Datasource[{}] does not exist in an index[{}]", id, DatasourceExtension.JOB_INDEX_NAME); + return null; + } + } catch (IndexNotFoundException e) { + LOGGER.error("Index[{}] is not found", DatasourceExtension.JOB_INDEX_NAME); + return null; + } + + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getSourceAsBytesRef() + ); + return Datasource.PARSER.parse(parser, null); + } + + /** + * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} + * @param client the client + * @param id the name of a datasource + * @param actionListener the action listener + */ + public static void getDatasource(final Client client, final String id, final ActionListener actionListener) { + GetRequest request = new GetRequest(DatasourceExtension.JOB_INDEX_NAME, id); + client.get(request, new ActionListener() { + @Override + public void onResponse(final GetResponse response) { + if (!response.isExists()) { + actionListener.onResponse(null); + } else { + try { + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getSourceAsBytesRef() + ); + actionListener.onResponse(Datasource.PARSER.parse(parser, null)); + } catch (IOException e) { + actionListener.onFailure(e); + } + } + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java new file mode 100644 index 00000000..f4c4c9fe --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceManifest.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.CharBuffer; +import java.security.AccessController; +import java.security.PrivilegedAction; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import org.opensearch.SpecialPermission; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.DeprecationHandler; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; + +/** + * Ip2Geo datasource manifest file object + * + * Manifest file is stored in an external endpoint. OpenSearch read the file and store values it in this object. + */ +@Setter +@Getter +@AllArgsConstructor +public class DatasourceManifest { + private static final ParseField URL_FIELD = new ParseField("url"); + private static final ParseField DB_NAME_FIELD = new ParseField("db_name"); + private static final ParseField MD5_HASH_FIELD = new ParseField("md5_hash"); + private static final ParseField VALID_FOR_IN_DAYS_FIELD = new ParseField("valid_for_in_days"); + private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at"); + private static final ParseField PROVIDER_FIELD = new ParseField("provider"); + + /** + * @param url URL of a ZIP file containing a database + * @return URL of a ZIP file containing a database + */ + private String url; + /** + * @param dbName A database file name inside the ZIP file + * @return A database file name inside the ZIP file + */ + private String dbName; + /** + * @param md5Hash MD5 hash value of a database file + * @return MD5 hash value of a database file + */ + private String md5Hash; + /** + * @param validForInDays A duration in which the database file is valid to use + * @return A duration in which the database file is valid to use + */ + private Long validForInDays; + /** + * @param updatedAt A date when the database was updated + * @return A date when the database was updated + */ + private Long updatedAt; + /** + * @param provider A database provider name + * @return A database provider name + */ + private String provider; + + /** + * Ddatasource manifest parser + */ + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_manifest", + true, + args -> { + String url = (String) args[0]; + String dbName = (String) args[1]; + String md5hash = (String) args[2]; + Long validForInDays = (Long) args[3]; + Long updatedAt = (Long) args[4]; + String provider = (String) args[5]; + return new DatasourceManifest(url, dbName, md5hash, validForInDays, updatedAt, provider); + } + ); + static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), URL_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), DB_NAME_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), MD5_HASH_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VALID_FOR_IN_DAYS_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), UPDATED_AT_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), PROVIDER_FIELD); + } + + /** + * Datasource manifest builder + */ + public static class Builder { + private static final int MANIFEST_FILE_MAX_BYTES = 1024 * 8; + + /** + * Build DatasourceManifest from a given url + * + * @param url url to downloads a manifest file + * @return DatasourceManifest representing the manifest file + * @throws Exception exception + */ + @SuppressForbidden(reason = "Need to connect to http endpoint to read manifest file") + public static DatasourceManifest build(final URL url) throws Exception { + SpecialPermission.check(); + return AccessController.doPrivileged((PrivilegedAction) () -> { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()))) { + CharBuffer charBuffer = CharBuffer.allocate(MANIFEST_FILE_MAX_BYTES); + reader.read(charBuffer); + charBuffer.flip(); + XContentParser parser = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + DeprecationHandler.IGNORE_DEPRECATIONS, + charBuffer.toString() + ); + return PARSER.parse(parser, null); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java new file mode 100644 index 00000000..27523bda --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceState.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +/** + * Ip2Geo datasource state + * + * When data source is created, it starts with PREPARING state. Once the first GeoIP data is generated, the state changes to AVAILABLE. + * Only when the first GeoIP data generation failed, the state changes to FAILED. + * Subsequent GeoIP data failure won't change data source state from AVAILABLE to FAILED. + * When delete request is received, the data source state changes to DELETING. + * + * State changed from left to right for the entire lifecycle of a datasource + * (PREPARING) to (FAILED or AVAILABLE) to (DELETING) + * + */ +public enum DatasourceState { + /** + * Data source is being prepared + */ + PREPARING, + /** + * Data source is ready to be used + */ + AVAILABLE, + /** + * Data source preparation failed + */ + FAILED, + /** + * Data source is being deleted + */ + DELETING +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java new file mode 100644 index 00000000..7cf4ae8f --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelper.java @@ -0,0 +1,357 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.SpecialPermission; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.MultiSearchRequestBuilder; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.Client; +import org.opensearch.client.Requests; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.SuppressForbidden; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentHelper; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilders; + +/** + * Helper class for GeoIp data + */ +public class GeoIpDataHelper { + private static final Logger LOGGER = LogManager.getLogger(GeoIpDataHelper.class); + private static final String IP_RANGE_FIELD_NAME = "_cidr"; + private static final String DATA_FIELD_NAME = "_data"; + + /** + * Create an index of single shard with auto expand replicas to all nodes + * + * @param clusterService cluster service + * @param client client + * @param indexName index name + * @param timeout timeout + * @throws IOException io exception + */ + public static void createIndex( + final ClusterService clusterService, + final Client client, + final String indexName, + final TimeValue timeout + ) throws IOException { + if (clusterService.state().metadata().hasIndex(indexName) == true) { + LOGGER.info("Index {} already exist. Skipping creation.", indexName); + return; + } + final Map indexSettings = new HashMap<>(); + indexSettings.put("index.number_of_shards", 1); + indexSettings.put("index.auto_expand_replicas", "0-all"); + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).settings(indexSettings).mapping(getIndexMapping()); + CreateIndexResponse response = client.admin().indices().create(createIndexRequest).actionGet(timeout); + LOGGER.info("Index {} created?: {}", indexName, response.isAcknowledged()); + } + + /** + * Generate XContentBuilder representing datasource database index mapping + * + * { + * "dynamic": false, + * "properties": { + * "_cidr": { + * "type": "ip_range", + * "doc_values": false + * } + * } + * } + * + * @return String representing datasource database index mapping + */ + private static String getIndexMapping() { + try { + try (InputStream is = DatasourceHelper.class.getResourceAsStream("/mappings/ip2geo_datasource.json")) { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining()); + } + } + } catch (Exception e) { + throw new IllegalArgumentException("Ip2Geo datasource mapping cannot be read correctly."); + } + } + + /** + * Create CSVParser of a GeoIP data + * + * @param manifest Datasource manifest + * @return CSVParser for GeoIP data + */ + @SuppressForbidden(reason = "Need to connect to http endpoint to read GeoIP database file") + public static CSVParser getDatabaseReader(final DatasourceManifest manifest) { + SpecialPermission.check(); + return AccessController.doPrivileged((PrivilegedAction) () -> { + try { + URL zipUrl = new URL(manifest.getUrl()); + ZipInputStream zipIn = new ZipInputStream(zipUrl.openStream()); + ZipEntry zipEntry = zipIn.getNextEntry(); + while (zipEntry != null) { + if (!zipEntry.getName().equalsIgnoreCase(manifest.getDbName())) { + zipEntry = zipIn.getNextEntry(); + continue; + } + return new CSVParser(new BufferedReader(new InputStreamReader(zipIn)), CSVFormat.RFC4180); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + LOGGER.error("ZIP file {} does not have database file {}", manifest.getUrl(), manifest.getDbName()); + throw new RuntimeException("ZIP file does not have database file"); + }); + } + + /** + * Create a document in json string format to ingest in datasource database index + * + * It assumes the first field as ip_range. The rest is added under data field. + * + * Document example + * { + * "_cidr":"1.0.0.1/25", + * "_data":{ + * "country": "USA", + * "city": "Seattle", + * "location":"13.23,42.12" + * } + * } + * + * @param fields a list of field name + * @param values a list of values + * @return Document in json string format + */ + public static String createDocument(final String[] fields, final String[] values) { + StringBuilder sb = new StringBuilder(); + sb.append("{\""); + sb.append(IP_RANGE_FIELD_NAME); + sb.append("\":\""); + sb.append(values[0]); + sb.append("\",\""); + sb.append(DATA_FIELD_NAME); + sb.append("\":{"); + for (int i = 1; i < fields.length; i++) { + if (i != 1) { + sb.append(","); + } + sb.append("\""); + sb.append(fields[i]); + sb.append("\":\""); + sb.append(values[i]); + sb.append("\""); + } + sb.append("}}"); + return sb.toString(); + } + + /** + * Query a given index using a given ip address to get geo data + * + * @param client client + * @param indexName index + * @param ip ip address + * @param actionListener action listener + */ + public static void getGeoData( + final Client client, + final String indexName, + final String ip, + final ActionListener> actionListener + ) { + client.prepareSearch(indexName) + .setSize(1) + .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) + .setPreference("_local") + .execute(new ActionListener<>() { + @Override + public void onResponse(final SearchResponse searchResponse) { + if (searchResponse.getHits().getHits().length == 0) { + actionListener.onResponse(Collections.emptyMap()); + } else { + Map geoData = (Map) XContentHelper.convertToMap( + searchResponse.getHits().getAt(0).getSourceRef(), + false, + XContentType.JSON + ).v2().get(DATA_FIELD_NAME); + actionListener.onResponse(geoData); + } + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }); + } + + /** + * Query a given index using a given ip address iterator to get geo data + * + * This method calls itself recursively until it processes all ip addresses in bulk of {@code bulkSize}. + * + * @param client the client + * @param indexName the index name + * @param ipIterator the iterator of ip addresses + * @param maxBundleSize number of ip address to pass in multi search + * @param maxConcurrentSearches the max concurrent search requests + * @param firstOnly return only the first matching result if true + * @param geoData collected geo data + * @param actionListener the action listener + */ + public static void getGeoData( + final Client client, + final String indexName, + final Iterator ipIterator, + final Integer maxBundleSize, + final Integer maxConcurrentSearches, + final boolean firstOnly, + final Map> geoData, + final ActionListener actionListener + ) { + MultiSearchRequestBuilder mRequestBuilder = client.prepareMultiSearch(); + if (maxConcurrentSearches != 0) { + mRequestBuilder.setMaxConcurrentSearchRequests(maxConcurrentSearches); + } + + List ipsToSearch = new ArrayList<>(maxBundleSize); + while (ipIterator.hasNext() && ipsToSearch.size() < maxBundleSize) { + String ip = ipIterator.next(); + if (geoData.get(ip) == null) { + mRequestBuilder.add( + client.prepareSearch(indexName) + .setSize(1) + .setQuery(QueryBuilders.termQuery(IP_RANGE_FIELD_NAME, ip)) + .setPreference("_local") + ); + ipsToSearch.add(ip); + } + } + + if (ipsToSearch.isEmpty()) { + actionListener.onResponse(null); + return; + } + + mRequestBuilder.execute(new ActionListener<>() { + @Override + public void onResponse(final MultiSearchResponse items) { + for (int i = 0; i < ipsToSearch.size(); i++) { + if (items.getResponses()[i].isFailure()) { + actionListener.onFailure(items.getResponses()[i].getFailure()); + return; + } + + if (items.getResponses()[i].getResponse().getHits().getHits().length == 0) { + geoData.put(ipsToSearch.get(i), Collections.emptyMap()); + continue; + } + + Map data = (Map) XContentHelper.convertToMap( + items.getResponses()[i].getResponse().getHits().getAt(0).getSourceRef(), + false, + XContentType.JSON + ).v2().get(DATA_FIELD_NAME); + + geoData.put(ipsToSearch.get(i), data); + + if (firstOnly) { + actionListener.onResponse(null); + return; + } + } + getGeoData(client, indexName, ipIterator, maxBundleSize, maxConcurrentSearches, firstOnly, geoData, actionListener); + } + + @Override + public void onFailure(final Exception e) { + actionListener.onFailure(e); + } + }); + } + + /** + * Puts GeoIP data from CSVRecord iterator into a given index in bulk + * + * @param client OpenSearch client + * @param indexName Index name to puts the GeoIP data + * @param fields Field name matching with data in CSVRecord in order + * @param iterator GeoIP data to insert + * @param bulkSize Bulk size of data to process + * @param timeout Timeout + */ + public static void putGeoData( + final Client client, + final String indexName, + final String[] fields, + final Iterator iterator, + final int bulkSize, + final TimeValue timeout + ) { + BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + while (iterator.hasNext()) { + try { + CSVRecord record = iterator.next(); + String document = createDocument(fields, record.values()); + IndexRequest request = Requests.indexRequest(indexName).source(document, XContentType.JSON); + bulkRequest.add(request); + if (!iterator.hasNext() || bulkRequest.requests().size() == bulkSize) { + BulkResponse response = client.bulk(bulkRequest).actionGet(timeout); + if (response.hasFailures()) { + LOGGER.error( + "Error occurred while ingesting GeoIP data in {} with an error {}", + indexName, + response.buildFailureMessage() + ); + throw new OpenSearchException("Error occurred while ingesting GeoIP data"); + } + bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + } + } catch (Exception e) { + throw e; + } + } + client.admin().indices().prepareRefresh(indexName).execute().actionGet(timeout); + client.admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).execute().actionGet(timeout); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java new file mode 100644 index 00000000..008feb9f --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettings.java @@ -0,0 +1,137 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.net.MalformedURLException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; + +import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; + +/** + * Settings for Ip2Geo datasource operations + */ +public class Ip2GeoSettings { + + /** + * Default endpoint to be used in GeoIP datasource creation API + */ + public static final Setting DATASOURCE_ENDPOINT = Setting.simpleString( + "plugins.geospatial.ip2geo.datasource.endpoint", + // TODO: This value is not correct. Update it later once CDN server is ready. + "https://geoip.maps.opensearch.org/v1/geolite-2/manifest.json", + new DatasourceEndpointValidator(), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default update interval to be used in Ip2Geo datasource creation API + */ + public static final Setting DATASOURCE_UPDATE_INTERVAL = Setting.timeSetting( + "plugins.geospatial.ip2geo.datasource.update_interval_in_days", + TimeValue.timeValueDays(3), + TimeValue.timeValueDays(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default timeout value for Ip2Geo processor + */ + public static final Setting TIMEOUT_IN_SECONDS = Setting.timeSetting( + "plugins.geospatial.ip2geo.timeout_in_seconds", + TimeValue.timeValueSeconds(30), + TimeValue.timeValueSeconds(1), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default bulk size for indexing GeoIP data + */ + public static final Setting INDEXING_BULK_SIZE = Setting.intSetting( + "plugins.geospatial.ip2geo.datasource.indexing_bulk_size", + 10000, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default cache size for GeoIP data + */ + public static final Setting CACHE_SIZE = Setting.intSetting( + "plugins.geospatial.ip2geo.processor.cache_size", + 1000, + 0, + Setting.Property.NodeScope + ); + + /** + * Default multi search bundle size for GeoIP data + * + * Multi search is used only when a field contains a list of ip addresses. + */ + public static final Setting MAX_BUNDLE_SIZE = Setting.intSetting( + "plugins.geospatial.ip2geo.processor.max_bundle_size", + 100, + 1, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Default multi search max concurrent searches + * + * Multi search is used only when a field contains a list of ip addresses. + * + * When the value is 0, it will use default value which will be decided + * based on node count and search thread pool size. + */ + public static final Setting MAX_CONCURRENT_SEARCHES = Setting.intSetting( + "plugins.geospatial.ip2geo.processor.max_concurrent_searches", + 0, + 0, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + /** + * Return all settings of Ip2Geo feature + * @return a list of all settings for Ip2Geo feature + */ + public static final List> settings() { + return List.of( + DATASOURCE_ENDPOINT, + DATASOURCE_UPDATE_INTERVAL, + TIMEOUT_IN_SECONDS, + INDEXING_BULK_SIZE, + CACHE_SIZE, + MAX_BUNDLE_SIZE, + MAX_CONCURRENT_SEARCHES + ); + } + + /** + * Visible for testing + */ + protected static class DatasourceEndpointValidator implements Setting.Validator { + @Override + public void validate(final String value) { + try { + new URL(value).toURI(); + } catch (MalformedURLException | URISyntaxException e) { + throw new IllegalArgumentException("Invalid URL format is provided"); + } + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java new file mode 100644 index 00000000..a53e6b72 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -0,0 +1,487 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import static org.opensearch.geospatial.plugin.GeospatialPlugin.IP2GEO_DATASOURCE_INDEX_NAME_PREFIX; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; + +import org.opensearch.core.ParseField; +import org.opensearch.core.xcontent.ConstructingObjectParser; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.geospatial.ip2geo.action.PutDatasourceRequest; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.jobscheduler.spi.schedule.Schedule; +import org.opensearch.jobscheduler.spi.schedule.ScheduleParser; + +/** + * Ip2Geo datasource job parameter + */ +@Getter +@Setter +@AllArgsConstructor +public class Datasource implements ScheduledJobParameter { + private static final long LOCK_DURATION_IN_SECONDS = 60 * 60; + + /** + * Default fields for job scheduling + */ + private static final ParseField ID_FIELD = new ParseField("id"); + private static final ParseField ENABLED_FILED = new ParseField("update_enabled"); + private static final ParseField LAST_UPDATE_TIME_FIELD = new ParseField("last_update_time"); + private static final ParseField LAST_UPDATE_TIME_FIELD_READABLE = new ParseField("last_update_time_field"); + private static final ParseField SCHEDULE_FIELD = new ParseField("schedule"); + private static final ParseField ENABLED_TIME_FILED = new ParseField("enabled_time"); + private static final ParseField ENABLED_TIME_FILED_READABLE = new ParseField("enabled_time_field"); + + /** + * Additional fields for datasource + */ + private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint"); + private static final ParseField STATE_FIELD = new ParseField("state"); + private static final ParseField INDICES_FIELD = new ParseField("indices"); + private static final ParseField DATABASE_FIELD = new ParseField("database"); + private static final ParseField UPDATE_STATS_FIELD = new ParseField("update_stats"); + + /** + * Default variables for job scheduling + */ + + /** + * @param id Id of a datasource + * @return Id of a datasource + */ + private String id; + /** + * @param lastUpdateTime Last update time of a datasource + * @return Last update time of a datasource + */ + private Instant lastUpdateTime; + /** + * @param enabledTime Last time when a scheduling is enabled for a GeoIP data update + * @return Last time when a scheduling is enabled for the job scheduler + */ + private Instant enabledTime; + /** + * @param isEnabled Indicate if GeoIP data update is scheduled or not + * @return Indicate if scheduling is enabled or not + */ + private boolean isEnabled; + /** + * @param schedule Schedule for a GeoIP data update + * @return Schedule for the job scheduler + */ + private Schedule schedule; + + /** + * Additional variables for datasource + */ + + /** + * @param endpoint URL of a manifest file + * @return URL of a manifest file + */ + private String endpoint; + /** + * @param state State of a datasource + * @return State of a datasource + */ + private DatasourceState state; + /** + * @param indices A list of indices having GeoIP data + * @return A list of indices having GeoIP data + */ + private List indices; + /** + * @param database GeoIP database information + * @return GeoIP database information + */ + private Database database; + /** + * @param updateStats GeoIP database update statistics + * @return GeoIP database update statistics + */ + private UpdateStats updateStats; + + /** + * Datasource parser + */ + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_metadata", + true, + args -> { + String id = (String) args[0]; + Instant lastUpdateTime = Instant.ofEpochMilli((long) args[1]); + Instant enabledTime = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]); + boolean isEnabled = (boolean) args[3]; + Schedule schedule = (Schedule) args[4]; + String endpoint = (String) args[5]; + DatasourceState state = DatasourceState.valueOf((String) args[6]); + List indices = (List) args[7]; + Database database = (Database) args[8]; + UpdateStats updateStats = (UpdateStats) args[9]; + Datasource parameter = new Datasource( + id, + lastUpdateTime, + enabledTime, + isEnabled, + schedule, + endpoint, + state, + indices, + database, + updateStats + ); + + return parameter; + } + ); + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD); + PARSER.declareLong(ConstructingObjectParser.constructorArg(), LAST_UPDATE_TIME_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), ENABLED_TIME_FILED); + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED_FILED); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ScheduleParser.parse(p), SCHEDULE_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), ENDPOINT_FIELD); + PARSER.declareString(ConstructingObjectParser.constructorArg(), STATE_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), INDICES_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), Database.PARSER, DATABASE_FIELD); + PARSER.declareObject(ConstructingObjectParser.constructorArg(), UpdateStats.PARSER, UPDATE_STATS_FIELD); + + } + + /** + * Visible for testing + */ + protected Datasource() { + + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + builder.field(ID_FIELD.getPreferredName(), id); + builder.timeField( + LAST_UPDATE_TIME_FIELD.getPreferredName(), + LAST_UPDATE_TIME_FIELD_READABLE.getPreferredName(), + lastUpdateTime.toEpochMilli() + ); + if (enabledTime != null) { + builder.timeField( + ENABLED_TIME_FILED.getPreferredName(), + ENABLED_TIME_FILED_READABLE.getPreferredName(), + enabledTime.toEpochMilli() + ); + } + builder.field(ENABLED_FILED.getPreferredName(), isEnabled); + builder.field(SCHEDULE_FIELD.getPreferredName(), schedule); + builder.field(ENDPOINT_FIELD.getPreferredName(), endpoint); + builder.field(STATE_FIELD.getPreferredName(), state.name()); + builder.field(INDICES_FIELD.getPreferredName(), indices); + builder.field(DATABASE_FIELD.getPreferredName(), database); + builder.field(UPDATE_STATS_FIELD.getPreferredName(), updateStats); + builder.endObject(); + return builder; + } + + @Override + public String getName() { + return id; + } + + @Override + public Instant getLastUpdateTime() { + return lastUpdateTime; + } + + @Override + public Instant getEnabledTime() { + return enabledTime; + } + + @Override + public Schedule getSchedule() { + return schedule; + } + + @Override + public boolean isEnabled() { + return isEnabled; + } + + @Override + public Long getLockDurationSeconds() { + return LOCK_DURATION_IN_SECONDS; + } + + /** + * Enable auto update of GeoIP data + */ + public void enable() { + enabledTime = Instant.now(); + isEnabled = true; + } + + /** + * Disable auto update of GeoIP data + */ + public void disable() { + enabledTime = null; + isEnabled = false; + } + + /** + * Current index name of a datasource + * + * @return Current index name of a datasource + */ + public String currentIndexName() { + return indexNameFor(database.updatedAt.toEpochMilli()); + } + + /** + * Index name for a given manifest + * + * @param manifest manifest + * @return Index name for a given manifest + */ + public String indexNameFor(final DatasourceManifest manifest) { + return indexNameFor(manifest.getUpdatedAt()); + } + + private String indexNameFor(final long suffix) { + return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATASOURCE_INDEX_NAME_PREFIX, id, suffix); + } + + public boolean isExpired() { + if (database.validForInDays == null) { + return false; + } + + Instant lastCheckedAt; + if (updateStats.lastSkippedAt == null) { + lastCheckedAt = updateStats.lastSucceededAt; + } else { + lastCheckedAt = updateStats.lastSucceededAt.isBefore(updateStats.lastSkippedAt) + ? updateStats.lastSkippedAt + : updateStats.lastSucceededAt; + } + return Instant.now().isAfter(lastCheckedAt.plus(database.validForInDays, ChronoUnit.DAYS)); + } + + /** + * Database of a datasource + */ + @Getter + @Setter + @AllArgsConstructor + public static class Database implements ToXContent { + private static final ParseField PROVIDER_FIELD = new ParseField("provider"); + private static final ParseField MD5_HASH_FIELD = new ParseField("md5_hash"); + private static final ParseField UPDATED_AT_FIELD = new ParseField("updated_at"); + private static final ParseField UPDATED_AT_FIELD_READABLE = new ParseField("updated_at_field"); + private static final ParseField FIELDS_FIELD = new ParseField("fields"); + private static final ParseField VALID_FOR_IN_DAYS_FIELD = new ParseField("valid_for_in_days"); + + /** + * @param provider A database provider name + * @return A database provider name + */ + private String provider; + /** + * @param md5Hash MD5 hash value of a database file + * @return MD5 hash value of a database file + */ + private String md5Hash; + /** + * @param updatedAt A date when the database was updated + * @return A date when the database was updated + */ + private Instant updatedAt; + /** + * @param validForInDays A duration in which the database file is valid to use + * @return A duration in which the database file is valid to use + */ + private Long validForInDays; + /** + * @param fields A list of available fields in the database + * @return A list of available fields in the database + */ + private List fields; + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_metadata_database", + true, + args -> { + String provider = (String) args[0]; + String md5Hash = (String) args[1]; + Instant updatedAt = args[2] == null ? null : Instant.ofEpochMilli((Long) args[2]); + Long validForInDays = (Long) args[3]; + List fields = (List) args[4]; + return new Database(provider, md5Hash, updatedAt, validForInDays, fields); + } + ); + static { + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), PROVIDER_FIELD); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), MD5_HASH_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), UPDATED_AT_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VALID_FOR_IN_DAYS_FIELD); + PARSER.declareStringArray(ConstructingObjectParser.optionalConstructorArg(), FIELDS_FIELD); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + if (provider != null) { + builder.field(PROVIDER_FIELD.getPreferredName(), provider); + } + if (md5Hash != null) { + builder.field(MD5_HASH_FIELD.getPreferredName(), md5Hash); + } + if (updatedAt != null) { + builder.timeField( + UPDATED_AT_FIELD.getPreferredName(), + UPDATED_AT_FIELD_READABLE.getPreferredName(), + updatedAt.toEpochMilli() + ); + } + if (validForInDays != null) { + builder.field(VALID_FOR_IN_DAYS_FIELD.getPreferredName(), validForInDays); + } + if (fields != null) { + builder.startArray(FIELDS_FIELD.getPreferredName()); + for (String field : fields) { + builder.value(field); + } + builder.endArray(); + } + builder.endObject(); + return builder; + } + } + + /** + * Update stats of a datasource + */ + @Getter + @Setter + @AllArgsConstructor + public static class UpdateStats implements ToXContent { + private static final ParseField LAST_SUCCEEDED_AT_FIELD = new ParseField("last_succeeded_at"); + private static final ParseField LAST_SUCCEEDED_AT_FIELD_READABLE = new ParseField("last_succeeded_at_field"); + private static final ParseField LAST_PROCESSING_TIME_IN_MILLIS_FIELD = new ParseField("last_processing_time_in_millis"); + private static final ParseField LAST_FAILED_AT_FIELD = new ParseField("last_failed_at"); + private static final ParseField LAST_FAILED_AT_FIELD_READABLE = new ParseField("last_failed_at_field"); + private static final ParseField LAST_SKIPPED_AT = new ParseField("last_skipped_at"); + private static final ParseField LAST_SKIPPED_AT_READABLE = new ParseField("last_skipped_at_field"); + + /** + * @param lastSucceededAt The last time when GeoIP data update was succeeded + * @return The last time when GeoIP data update was succeeded + */ + private Instant lastSucceededAt; + /** + * @param lastProcessingTimeInMillis The last processing time when GeoIP data update was succeeded + * @return The last processing time when GeoIP data update was succeeded + */ + private Long lastProcessingTimeInMillis; + /** + * @param lastFailedAt The last time when GeoIP data update was failed + * @return The last time when GeoIP data update was failed + */ + private Instant lastFailedAt; + /** + * @param lastSkippedAt The last time when GeoIP data update was skipped as there was no new update from an endpoint + * @return The last time when GeoIP data update was skipped as there was no new update from an endpoint + */ + private Instant lastSkippedAt; + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "datasource_metadata_update_stats", + true, + args -> { + Instant lastSucceededAt = args[0] == null ? null : Instant.ofEpochMilli((long) args[0]); + Long lastProcessingTimeInMillis = (Long) args[1]; + Instant lastFailedAt = args[2] == null ? null : Instant.ofEpochMilli((long) args[2]); + Instant lastSkippedAt = args[3] == null ? null : Instant.ofEpochMilli((long) args[3]); + return new UpdateStats(lastSucceededAt, lastProcessingTimeInMillis, lastFailedAt, lastSkippedAt); + } + ); + + static { + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_SUCCEEDED_AT_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_PROCESSING_TIME_IN_MILLIS_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_FAILED_AT_FIELD); + PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), LAST_SKIPPED_AT); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + if (lastSucceededAt != null) { + builder.timeField( + LAST_SUCCEEDED_AT_FIELD.getPreferredName(), + LAST_SUCCEEDED_AT_FIELD_READABLE.getPreferredName(), + lastSucceededAt.toEpochMilli() + ); + } + if (lastProcessingTimeInMillis != null) { + builder.field(LAST_PROCESSING_TIME_IN_MILLIS_FIELD.getPreferredName(), lastProcessingTimeInMillis); + } + if (lastFailedAt != null) { + builder.timeField( + LAST_FAILED_AT_FIELD.getPreferredName(), + LAST_FAILED_AT_FIELD_READABLE.getPreferredName(), + lastFailedAt.toEpochMilli() + ); + } + if (lastSkippedAt != null) { + builder.timeField( + LAST_SKIPPED_AT.getPreferredName(), + LAST_SKIPPED_AT_READABLE.getPreferredName(), + lastSkippedAt.toEpochMilli() + ); + } + builder.endObject(); + return builder; + } + } + + /** + * Builder class for Datasource + */ + public static class Builder { + public static Datasource build(final PutDatasourceRequest request) { + String id = request.getId(); + Instant lastUpdateTime = Instant.now(); + Instant enabledTime = null; + boolean isEnabled = false; + Schedule schedule = new IntervalSchedule(Instant.now(), (int) request.getUpdateIntervalInDays().days(), ChronoUnit.DAYS); + String endpoint = request.getEndpoint(); + DatasourceState state = DatasourceState.PREPARING; + List indices = new ArrayList<>(); + Database database = new Database(null, null, null, null, null); + UpdateStats updateStats = new UpdateStats(null, null, null, null); + return new Datasource(id, lastUpdateTime, enabledTime, isEnabled, schedule, endpoint, state, indices, database, updateStats); + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java new file mode 100644 index 00000000..682669cb --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceExtension.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import org.opensearch.jobscheduler.spi.JobSchedulerExtension; +import org.opensearch.jobscheduler.spi.ScheduledJobParser; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; + +/** + * Datasource job scheduler extension + */ +public class DatasourceExtension implements JobSchedulerExtension { + /** + * Job index name for a datasource + */ + public static final String JOB_INDEX_NAME = ".scheduler_geospatial_ip2geo_datasource"; + + @Override + public String getJobType() { + return "scheduler_geospatial_ip2geo_datasource"; + } + + @Override + public String getJobIndex() { + return JOB_INDEX_NAME; + } + + @Override + public ScheduledJobRunner getJobRunner() { + return DatasourceRunner.getJobRunnerInstance(); + } + + @Override + public ScheduledJobParser getJobParser() { + return (parser, id, jobDocVersion) -> Datasource.PARSER.parse(parser, null); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java new file mode 100644 index 00000000..b0f562ba --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceRunner.java @@ -0,0 +1,286 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import java.net.URL; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.jobscheduler.spi.JobExecutionContext; +import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.jobscheduler.spi.utils.LockService; +import org.opensearch.threadpool.ThreadPool; + +/** + * Datasource update task + * + * This is a background task which is responsible for updating Ip2Geo datasource + */ +public class DatasourceRunner implements ScheduledJobRunner { + private static final Logger LOGGER = LogManager.getLogger(DatasourceRunner.class); + + private static DatasourceRunner INSTANCE; + + /** + * Return a singleton job runner instance + * @return job runner + */ + public static DatasourceRunner getJobRunnerInstance() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (DatasourceRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new DatasourceRunner(); + return INSTANCE; + } + } + + private ClusterService clusterService; + private ThreadPool threadPool; + private Client client; + private TimeValue timeout; + private int indexingBulkSize; + + private DatasourceRunner() { + // Singleton class, use getJobRunner method instead of constructor + } + + /** + * Set cluster service + * @param clusterService the cluster service + */ + public void setClusterService(ClusterService clusterService) { + this.clusterService = clusterService; + + timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, it -> timeout = it); + indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(Ip2GeoSettings.INDEXING_BULK_SIZE, it -> indexingBulkSize = it); + } + + /** + * Set thread pool + * @param threadPool the thread pool + */ + public void setThreadPool(ThreadPool threadPool) { + this.threadPool = threadPool; + } + + /** + * Set client + * @param client the client + */ + public void setClient(Client client) { + this.client = client; + } + + /** + * Update GeoIP data + * + * Lock is used so that only one of nodes run this task. + * Lock duration is 1 hour to avoid refreshing. This is okay because update interval is 1 day minimum. + * + * @param jobParameter job parameter + * @param context context + */ + @Override + public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext context) { + LOGGER.info("Update job started for a datasource[{}]", jobParameter.getName()); + if (!(jobParameter instanceof Datasource)) { + throw new IllegalStateException( + "Job parameter is not instance of DatasourceUpdateJobParameter, type: " + jobParameter.getClass().getCanonicalName() + ); + } + + if (clusterService == null) { + throw new IllegalStateException("ClusterService is not initialized."); + } + + if (threadPool == null) { + throw new IllegalStateException("ThreadPool is not initialized."); + } + + if (client == null) { + throw new IllegalStateException("Client is not initialized."); + } + + final LockService lockService = context.getLockService(); + Runnable runnable = () -> { + if (jobParameter.getLockDurationSeconds() != null) { + lockService.acquireLock(jobParameter, context, ActionListener.wrap(lock -> { + if (lock == null) { + return; + } + Datasource parameter = (Datasource) jobParameter; + try { + deleteUnusedIndices(parameter); + updateDatasource(parameter); + deleteUnusedIndices(parameter); + } catch (Exception e) { + LOGGER.error("Failed to update datasource for {}", parameter.getId(), e); + parameter.getUpdateStats().setLastFailedAt(Instant.now()); + DatasourceHelper.updateDatasource(client, parameter, timeout); + } finally { + lockService.release( + lock, + ActionListener.wrap( + released -> { LOGGER.info("Released lock for job {}", jobParameter.getName()); }, + exception -> { throw new IllegalStateException("Failed to release lock."); } + ) + ); + } + }, exception -> { throw new IllegalStateException("Failed to acquire lock."); })); + } + }; + + threadPool.generic().submit(runnable); + } + + /** + * Delete all indices except the one which are being used + * + * @param parameter + */ + private void deleteUnusedIndices(final Datasource parameter) { + try { + List deletedIndices = new ArrayList<>(); + for (String index : parameter.getIndices()) { + if (index.equals(parameter.currentIndexName())) { + continue; + } + + if (!clusterService.state().metadata().hasIndex(index)) { + deletedIndices.add(index); + continue; + } + + try { + if (client.admin() + .indices() + .prepareDelete(index) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .execute() + .actionGet(timeout) + .isAcknowledged()) { + deletedIndices.add(index); + } else { + LOGGER.error("Failed to delete an index {}", index); + } + } catch (Exception e) { + LOGGER.error("Failed to delete an index {}", index, e); + } + } + if (!deletedIndices.isEmpty()) { + parameter.getIndices().removeAll(deletedIndices); + DatasourceHelper.updateDatasource(client, parameter, timeout); + } + } catch (Exception e) { + LOGGER.error("Failed to delete old indices for {}", parameter.getId(), e); + } + } + + /** + * Update GeoIP data internal + * + * @param jobParameter + * @throws Exception + */ + private void updateDatasource(final Datasource jobParameter) throws Exception { + if (!DatasourceState.AVAILABLE.equals(jobParameter.getState())) { + LOGGER.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState()); + jobParameter.disable(); + jobParameter.getUpdateStats().setLastFailedAt(Instant.now()); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + return; + } + + URL url = new URL(jobParameter.getEndpoint()); + DatasourceManifest manifest = DatasourceManifest.Builder.build(url); + + if (skipUpdate(jobParameter, manifest)) { + LOGGER.info("Skipping GeoIP database update. Update is not required for {}", jobParameter.getId()); + jobParameter.getUpdateStats().setLastSkippedAt(Instant.now()); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + return; + } + + Instant startTime = Instant.now(); + String indexName = jobParameter.indexNameFor(manifest); + jobParameter.getIndices().add(indexName); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + GeoIpDataHelper.createIndex(clusterService, client, indexName, timeout); + String[] fields; + try (CSVParser reader = GeoIpDataHelper.getDatabaseReader(manifest)) { + Iterator iter = reader.iterator(); + fields = iter.next().values(); + if (!jobParameter.getDatabase().getFields().equals(Arrays.asList(fields))) { + LOGGER.error("The previous fields and new fields does not match."); + LOGGER.error("Previous: {}, New: {}", jobParameter.getDatabase().getFields().toString(), Arrays.asList(fields).toString()); + throw new IllegalStateException("Fields does not match between old and new"); + } + GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout); + } + + Instant endTime = Instant.now(); + jobParameter.getDatabase().setProvider(manifest.getProvider()); + jobParameter.getDatabase().setMd5Hash(manifest.getMd5Hash()); + jobParameter.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt())); + jobParameter.getDatabase().setValidForInDays(manifest.getValidForInDays()); + jobParameter.getDatabase().setFields(Arrays.asList(fields)); + jobParameter.getUpdateStats().setLastSucceededAt(endTime); + jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli()); + DatasourceHelper.updateDatasource(client, jobParameter, timeout); + LOGGER.info( + "GeoIP database creation succeeded for {} and took {} seconds", + jobParameter.getId(), + Duration.between(startTime, endTime) + ); + } + + /** + * Determine if update is needed or not + * + * Update is needed when all following conditions are met + * 1. MD5 hash value in datasource is different with MD5 hash value in manifest + * 2. updatedAt value in datasource is before updateAt value in manifest + * + * @param parameter + * @param manifest + * @return + */ + private boolean skipUpdate(final Datasource parameter, final DatasourceManifest manifest) { + if (manifest.getMd5Hash().equals(parameter.getDatabase().getMd5Hash())) { + return true; + } + + return parameter.getDatabase().getUpdatedAt().toEpochMilli() >= manifest.getUpdatedAt(); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java new file mode 100644 index 00000000..6a18be0d --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCache.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.processor; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.cache.Cache; +import org.opensearch.common.cache.CacheBuilder; +import org.opensearch.common.unit.TimeValue; + +/** + * The in-memory cache for the ip2geo data. There should only be 1 instance of this class. + */ +public class Ip2GeoCache { + private static final Logger LOGGER = LogManager.getLogger(Ip2GeoCache.class); + private static final TimeValue CACHING_PERIOD = TimeValue.timeValueMinutes(1); + private final Cache> cache; + + /** + * Default constructor + * + * @param maxSize size of a cache + */ + public Ip2GeoCache(final long maxSize) { + if (maxSize < 0) { + throw new IllegalArgumentException("ip2geo max cache size must be 0 or greater"); + } + this.cache = CacheBuilder.>builder() + .setMaximumWeight(maxSize) + .setExpireAfterWrite(CACHING_PERIOD) + .build(); + } + + /** + * Put data in a cache if it is absent and return the data + * + * @param ip the first part of a key + * @param datasourceName the second part of a key + * @param retrieveFunction function to retrieve a data to be stored in a cache + * @return data in a cache + */ + public Map putIfAbsent( + final String ip, + final String datasourceName, + final Function> retrieveFunction + ) { + CacheKey cacheKey = new CacheKey(ip, datasourceName); + Map response = cache.get(cacheKey); + if (response == null) { + response = retrieveFunction.apply(ip); + response = response == null ? Collections.emptyMap() : response; + cache.put(cacheKey, response); + } + return response; + } + + /** + * Put data in a cache + * + * @param ip the first part of a key + * @param datasourceName the second part of a key + * @param data the data + */ + public void put(final String ip, final String datasourceName, final Map data) { + CacheKey cacheKey = new CacheKey(ip, datasourceName); + cache.put(cacheKey, data); + } + + protected Map get(final String ip, final String datasourceName) { + CacheKey cacheKey = new CacheKey(ip, datasourceName); + return cache.get(cacheKey); + } + + /** + * The key to use for the cache. Since this cache can span multiple ip2geo processors that all use different datasource, the datasource + * name is needed to be included in the cache key. For example, if we only used the IP address as the key the same IP may be in multiple + * datasource with different values. The datasource name scopes the IP to the correct datasource + */ + private static class CacheKey { + + private final String ip; + private final String datasourceName; + + private CacheKey(final String ip, final String datasourceName) { + this.ip = ip; + this.datasourceName = datasourceName; + } + + // generated + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(ip, cacheKey.ip) && Objects.equals(datasourceName, cacheKey.datasourceName); + } + + // generated + @Override + public int hashCode() { + return Objects.hash(ip, datasourceName); + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java new file mode 100644 index 00000000..92b048bc --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -0,0 +1,442 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.geospatial.ip2geo.processor; + +import static org.opensearch.cluster.service.ClusterApplierService.CLUSTER_UPDATE_THREAD_NAME; +import static org.opensearch.ingest.ConfigurationUtils.newConfigurationException; +import static org.opensearch.ingest.ConfigurationUtils.readBooleanProperty; +import static org.opensearch.ingest.ConfigurationUtils.readOptionalList; +import static org.opensearch.ingest.ConfigurationUtils.readStringProperty; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.BiConsumer; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.ip2geo.common.DatasourceHelper; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.ingest.AbstractProcessor; +import org.opensearch.ingest.IngestDocument; +import org.opensearch.ingest.IngestService; +import org.opensearch.ingest.Processor; + +/** + * Ip2Geo processor + */ +public final class Ip2GeoProcessor extends AbstractProcessor { + private static final Logger LOGGER = LogManager.getLogger(Ip2GeoProcessor.class); + private static final Map DATA_EXPIRED = Map.of("error", "ip2geo_data_expired"); + private static final String PROPERTY_IP = "ip"; + private final String field; + private final String targetField; + private final String datasourceName; + private final Set properties; + private final boolean ignoreMissing; + private final boolean firstOnly; + private final Ip2GeoCache cache; + private final Client client; + private final ClusterService clusterService; + + private int maxBundleSize; + private int maxConcurrentSearches; + + /** + * Ip2Geo processor type + */ + public static final String TYPE = "ip2geo"; + + /** + * Construct an Ip2Geo processor. + * @param tag the processor tag + * @param description the processor description + * @param field the source field to geo-IP map + * @param targetField the target field + * @param datasourceName the datasourceName + * @param properties the properties + * @param ignoreMissing true if documents with a missing value for the field should be ignored + * @param firstOnly true if only first result should be returned in case of array + * @param cache the Ip2Geo cache + * @param client the client + * @param clusterService the cluster service + */ + public Ip2GeoProcessor( + final String tag, + final String description, + final String field, + final String targetField, + final String datasourceName, + final Set properties, + final boolean ignoreMissing, + final boolean firstOnly, + final Ip2GeoCache cache, + final Client client, + final ClusterService clusterService + ) { + super(tag, description); + this.field = field; + this.targetField = targetField; + this.datasourceName = datasourceName; + this.properties = properties; + this.ignoreMissing = ignoreMissing; + this.firstOnly = firstOnly; + this.cache = cache; + this.client = client; + this.clusterService = clusterService; + + maxBundleSize = clusterService.getClusterSettings().get(Ip2GeoSettings.MAX_BUNDLE_SIZE); + clusterService.getClusterSettings().addSettingsUpdateConsumer(Ip2GeoSettings.MAX_BUNDLE_SIZE, it -> maxBundleSize = it); + maxConcurrentSearches = clusterService.getClusterSettings().get(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(Ip2GeoSettings.MAX_CONCURRENT_SEARCHES, it -> maxConcurrentSearches = it); + } + + /** + * Add geo data of a given ip address to ingestDocument in asynchronous way + * + * @param ingestDocument the document + * @param handler the handler + */ + @Override + public void execute(IngestDocument ingestDocument, BiConsumer handler) { + Object ip = ingestDocument.getFieldValue(field, Object.class, ignoreMissing); + + if (ip == null && ignoreMissing) { + handler.accept(ingestDocument, null); + return; + } else if (ip == null) { + handler.accept(null, new IllegalArgumentException("field [" + field + "] is null, cannot extract geo information.")); + return; + } + + if (ip instanceof String) { + executeInternal(ingestDocument, handler, (String) ip); + } else if (ip instanceof List) { + executeInternal(ingestDocument, handler, ((List) ip)); + } else { + throw new IllegalArgumentException("field [" + field + "] should contain only string or array of strings"); + } + } + + /** + * Use {@code execute(IngestDocument, BiConsumer)} instead + * + * @param ingestDocument the document + * @return none + */ + @Override + public IngestDocument execute(IngestDocument ingestDocument) { + throw new IllegalStateException("Not implemented"); + } + + /** + * Handle single ip + * + * @param ingestDocument the document + * @param handler the handler + * @param ip the ip + */ + private void executeInternal( + final IngestDocument ingestDocument, + final BiConsumer handler, + final String ip + ) { + Map geoData = cache.get(ip, datasourceName); + if (geoData != null) { + if (!geoData.isEmpty()) { + ingestDocument.setFieldValue(targetField, filteredGeoData(geoData, ip)); + } + handler.accept(ingestDocument, null); + return; + } + + DatasourceHelper.getDatasource(client, datasourceName, new ActionListener<>() { + @Override + public void onResponse(final Datasource datasource) { + if (datasource == null) { + LOGGER.error("Datasource[{}] does not exist", datasourceName); + handler.accept(null, new IllegalStateException("Datasource does not exist")); + return; + } + + if (datasource.isExpired()) { + ingestDocument.setFieldValue(targetField, DATA_EXPIRED); + handler.accept(ingestDocument, null); + return; + } + + GeoIpDataHelper.getGeoData(client, datasource.currentIndexName(), ip, new ActionListener<>() { + @Override + public void onResponse(final Map stringObjectMap) { + cache.put(ip, datasourceName, stringObjectMap); + if (!stringObjectMap.isEmpty()) { + ingestDocument.setFieldValue(targetField, filteredGeoData(stringObjectMap, ip)); + } + handler.accept(ingestDocument, null); + } + + @Override + public void onFailure(final Exception e) { + LOGGER.error("Error while retrieving geo data from datasource[{}] for a given ip[{}]", datasourceName, ip, e); + handler.accept(null, new OpenSearchException("Failed to geo data")); + } + }); + } + + @Override + public void onFailure(final Exception e) { + LOGGER.error("Failed to get datasource[{}]", datasourceName, e); + handler.accept(null, new OpenSearchException("Failed to get datasource[{}]", datasourceName)); + } + }); + } + + /** + * Handle multiple ips + * + * @param ingestDocument the document + * @param handler the handler + * @param ips the ip list + */ + private void executeInternal( + final IngestDocument ingestDocument, + final BiConsumer handler, + final List ips + ) { + Map> data = new HashMap<>(); + for (Object ip : ips) { + if (ip instanceof String == false) { + throw new IllegalArgumentException("array in field [" + field + "] should only contain strings"); + } + String ipAddr = (String) ip; + data.put(ipAddr, cache.get(ipAddr, datasourceName)); + } + List ipList = (List) ips; + DatasourceHelper.getDatasource(client, datasourceName, new ActionListener<>() { + @Override + public void onResponse(final Datasource datasource) { + if (datasource == null) { + LOGGER.error("Datasource[{}] does not exist", datasourceName); + handler.accept(null, new IllegalStateException("Datasource does not exist")); + return; + } + + if (datasource.isExpired()) { + ingestDocument.setFieldValue(targetField, DATA_EXPIRED); + handler.accept(ingestDocument, null); + return; + } + GeoIpDataHelper.getGeoData( + client, + datasource.currentIndexName(), + ipList.iterator(), + maxBundleSize, + maxConcurrentSearches, + firstOnly, + data, + new ActionListener<>() { + @Override + public void onResponse(final Object obj) { + for (Map.Entry> entry : data.entrySet()) { + cache.put(entry.getKey(), datasourceName, entry.getValue()); + } + + if (firstOnly) { + for (String ipAddr : ipList) { + Map geoData = data.get(ipAddr); + // GeoData for ipAddr won't be null + if (!geoData.isEmpty()) { + ingestDocument.setFieldValue(targetField, geoData); + handler.accept(ingestDocument, null); + return; + } + } + handler.accept(ingestDocument, null); + } else { + boolean match = false; + List> geoDataList = new ArrayList<>(ipList.size()); + for (String ipAddr : ipList) { + Map geoData = data.get(ipAddr); + // GeoData for ipAddr won't be null + geoDataList.add(geoData.isEmpty() ? null : geoData); + if (!geoData.isEmpty()) { + match = true; + } + } + if (match) { + ingestDocument.setFieldValue(targetField, geoDataList); + } + handler.accept(ingestDocument, null); + } + } + + @Override + public void onFailure(final Exception e) { + LOGGER.error( + "Error while retrieving geo data from datasource[{}] for a given ip[{}]", + datasourceName, + ipList, + e + ); + handler.accept(null, new OpenSearchException("Failed to geo data")); + } + } + ); + } + + @Override + public void onFailure(final Exception e) { + LOGGER.error("Failed to get datasource[{}]", datasourceName, e); + handler.accept(null, new OpenSearchException("Failed to get datasource[{}]", datasourceName)); + } + }); + } + + private Map filteredGeoData(final Map geoData, final String ip) { + Map filteredGeoData; + if (properties == null) { + filteredGeoData = geoData; + } else { + filteredGeoData = new HashMap<>(); + for (String property : this.properties) { + if (property.equals(PROPERTY_IP)) { + filteredGeoData.put(PROPERTY_IP, ip); + } else { + filteredGeoData.put(property, geoData.get(property)); + } + } + } + return filteredGeoData; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Ip2Geo processor factory + */ + public static final class Factory implements Processor.Factory { + private final Ip2GeoCache cache; + private final Client client; + private final IngestService ingestService; + private TimeValue timeout; + + /** + * Default constructor + * + * @param cache the cache + * @param client the client + * @param ingestService the ingest service + */ + public Factory(final Ip2GeoCache cache, final Client client, final IngestService ingestService) { + this.cache = cache; + this.client = client; + this.ingestService = ingestService; + + timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(client.settings()); + ClusterSettings clusterSettings = ingestService.getClusterService().getClusterSettings(); + clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, it -> timeout = it); + } + + /** + * When a user create a processor, this method is called twice. Once to validate the new processor and another + * to apply cluster state change after the processor is added. + * + * The second call is made by ClusterApplierService. Therefore, we cannot access cluster state in the call. + * That means, we cannot even query an index inside the call. + * + * Because the processor is validated in the first call, we skip the validation in the second call. + * + * @see org.opensearch.cluster.service.ClusterApplierService#state() + */ + @Override + public Ip2GeoProcessor create( + final Map registry, + final String processorTag, + final String description, + final Map config + ) throws IOException { + String ipField = readStringProperty(TYPE, processorTag, config, "field"); + String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "ip2geo"); + String datasourceName = readStringProperty(TYPE, processorTag, config, "datasource"); + List propertyNames = readOptionalList(TYPE, processorTag, config, "properties"); + boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false); + boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true); + + // Skip validation for the call by cluster applier service + if (!Thread.currentThread().getName().contains(CLUSTER_UPDATE_THREAD_NAME)) { + validate(processorTag, datasourceName, propertyNames); + } + + return new Ip2GeoProcessor( + processorTag, + description, + ipField, + targetField, + datasourceName, + propertyNames == null ? null : new HashSet<>(propertyNames), + ignoreMissing, + firstOnly, + cache, + client, + ingestService.getClusterService() + ); + } + + private void validate(final String processorTag, final String datasourceName, final List propertyNames) throws IOException { + Datasource datasource = DatasourceHelper.getDatasource(client, datasourceName, timeout); + + if (datasource == null) { + throw newConfigurationException(TYPE, processorTag, "datasource", "datasource [" + datasourceName + "] doesn't exist"); + } + + if (!DatasourceState.AVAILABLE.equals(datasource.getState())) { + throw newConfigurationException( + TYPE, + processorTag, + "datasource", + "datasource [" + datasourceName + "] is not in an available state" + ); + } + + if (propertyNames == null) { + return; + } + + // Validate properties are valid. If not add all available properties. + final Set availableProperties = new HashSet<>(datasource.getDatabase().getFields()); + availableProperties.add(PROPERTY_IP); + for (String fieldName : propertyNames) { + if (!availableProperties.contains(fieldName)) { + throw newConfigurationException( + TYPE, + processorTag, + "properties", + "property [" + fieldName + "] is not available in the datasource [" + datasourceName + "]" + ); + } + } + } + } +} diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index 1d2ab65c..c9a43a55 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -20,6 +20,7 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsFilter; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -32,6 +33,13 @@ import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper; import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldTypeParser; import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder; +import org.opensearch.geospatial.ip2geo.action.PutDatasourceAction; +import org.opensearch.geospatial.ip2geo.action.PutDatasourceTransportAction; +import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceAction; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoCache; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; import org.opensearch.geospatial.processor.FeatureProcessor; import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction; import org.opensearch.geospatial.search.aggregations.bucket.geogrid.GeoHexGrid; @@ -41,12 +49,14 @@ import org.opensearch.geospatial.stats.upload.UploadStatsAction; import org.opensearch.geospatial.stats.upload.UploadStatsTransportAction; import org.opensearch.index.mapper.Mapper; +import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.ingest.Processor; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.IngestPlugin; import org.opensearch.plugins.MapperPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.SearchPlugin; +import org.opensearch.plugins.SystemIndexPlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.rest.RestController; import org.opensearch.rest.RestHandler; @@ -58,15 +68,37 @@ * Entry point for Geospatial features. It provides additional Processors, Actions * to interact with Cluster. */ -public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin { +public class GeospatialPlugin extends Plugin implements IngestPlugin, ActionPlugin, MapperPlugin, SearchPlugin, SystemIndexPlugin { + /** + * Prefix of indices to hold GeoIP data for Ip2Geo datasource + */ + public static final String IP2GEO_DATASOURCE_INDEX_NAME_PREFIX = ".ip2geo-datasource"; + + @Override + public Collection getSystemIndexDescriptors(Settings settings) { + return List.of(new SystemIndexDescriptor(IP2GEO_DATASOURCE_INDEX_NAME_PREFIX, "System index used for Ip2Geo datasource")); + } @Override public Map getProcessors(Processor.Parameters parameters) { return MapBuilder.newMapBuilder() .put(FeatureProcessor.TYPE, new FeatureProcessor.Factory()) + .put( + Ip2GeoProcessor.TYPE, + new Ip2GeoProcessor.Factory( + new Ip2GeoCache(Ip2GeoSettings.CACHE_SIZE.get(parameters.client.settings())), + parameters.client, + parameters.ingestService + ) + ) .immutableMap(); } + @Override + public List> getSettings() { + return Ip2GeoSettings.settings(); + } + @Override public Collection createComponents( Client client, @@ -81,6 +113,11 @@ public Collection createComponents( IndexNameExpressionResolver indexNameExpressionResolver, Supplier repositoriesServiceSupplier ) { + // Initialize DatasourceUpdateRunner + DatasourceRunner.getJobRunnerInstance().setClient(client); + DatasourceRunner.getJobRunnerInstance().setClusterService(clusterService); + DatasourceRunner.getJobRunnerInstance().setThreadPool(threadPool); + return List.of(UploadStats.getInstance()); } @@ -94,16 +131,15 @@ public List getRestHandlers( IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster ) { - RestUploadGeoJSONAction uploadGeoJSONAction = new RestUploadGeoJSONAction(); - RestUploadStatsAction statsAction = new RestUploadStatsAction(); - return List.of(statsAction, uploadGeoJSONAction); + return List.of(new RestUploadStatsAction(), new RestUploadGeoJSONAction(), new RestPutDatasourceAction(settings, clusterSettings)); } @Override public List> getActions() { return List.of( new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class), - new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class) + new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class), + new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class) ); } diff --git a/src/main/plugin-metadata/plugin-security.policy b/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 00000000..6e9e1030 --- /dev/null +++ b/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +grant { + // needed by Ip2Geo datasource to get GeoIP database + permission java.net.SocketPermission "*", "connect,resolve"; +}; diff --git a/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension b/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension new file mode 100644 index 00000000..a3a150fc --- /dev/null +++ b/src/main/resources/META-INF/services/org.opensearch.jobscheduler.spi.JobSchedulerExtension @@ -0,0 +1,9 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# + +org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension diff --git a/src/main/resources/mappings/ip2geo_datasource.json b/src/main/resources/mappings/ip2geo_datasource.json new file mode 100644 index 00000000..3179ef0d --- /dev/null +++ b/src/main/resources/mappings/ip2geo_datasource.json @@ -0,0 +1,9 @@ +{ + "dynamic": false, + "properties": { + "_cidr": { + "type": "ip_range", + "doc_values": false + } + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java new file mode 100644 index 00000000..836c2f91 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/PutDatasourceRequestTests.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.Locale; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.test.OpenSearchTestCase; + +public class PutDatasourceRequestTests extends OpenSearchTestCase { + + public void testValidateInvalidUrl() { + PutDatasourceRequest request = new PutDatasourceRequest("test"); + request.setEndpoint("invalidUrl"); + request.setUpdateIntervalInDays(TimeValue.ZERO); + ActionRequestValidationException exception = request.validate(); + assertEquals(1, exception.validationErrors().size()); + assertEquals("Invalid URL format is provided", exception.validationErrors().get(0)); + } + + public void testValidateInvalidManifestFile() { + PutDatasourceRequest request = new PutDatasourceRequest("test"); + request.setId("test"); + request.setEndpoint("https://hi.com"); + request.setUpdateIntervalInDays(TimeValue.ZERO); + ActionRequestValidationException exception = request.validate(); + assertEquals(1, exception.validationErrors().size()); + assertEquals( + String.format(Locale.ROOT, "Error occurred while reading a file from %s", request.getEndpoint()), + exception.validationErrors().get(0) + ); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceActionTests.java new file mode 100644 index 00000000..1a08c5b2 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestPutDatasourceActionTests.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.util.HashSet; + +import org.junit.Before; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +public class RestPutDatasourceActionTests extends RestActionTestCase { + private RestPutDatasourceAction action; + + @Before + public void setupAction() { + action = new RestPutDatasourceAction(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, new HashSet(Ip2GeoSettings.settings()))); + controller().registerHandler(action); + } + + public void testPrepareRequest() { + String content = "{\"endpoint\":\"https://test.com\", \"update_interval\":1}"; + RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath("/_geoip/datasource/test") + .withContent(new BytesArray(content), XContentType.JSON) + .build(); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof PutDatasourceRequest); + PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest; + assertEquals("https://test.com", putDatasourceRequest.getEndpoint()); + assertEquals(TimeValue.timeValueDays(1), putDatasourceRequest.getUpdateIntervalInDays()); + assertEquals("test", putDatasourceRequest.getId()); + return null; + }); + + dispatchRequest(restRequest); + } + + public void testPrepareRequestDefaultValue() { + RestRequest restRequestWithEmptyContent = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath("/_geoip/datasource/test") + .withContent(new BytesArray("{}"), XContentType.JSON) + .build(); + + RestRequest restRequestWithoutContent = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.PUT) + .withPath("/_geoip/datasource/test") + .build(); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof PutDatasourceRequest); + PutDatasourceRequest putDatasourceRequest = (PutDatasourceRequest) actionRequest; + assertEquals("https://geoip.maps.opensearch.org/v1/geolite-2/manifest.json", putDatasourceRequest.getEndpoint()); + assertEquals(TimeValue.timeValueDays(3), putDatasourceRequest.getUpdateIntervalInDays()); + assertEquals("test", putDatasourceRequest.getId()); + return null; + }); + + dispatchRequest(restRequestWithEmptyContent); + dispatchRequest(restRequestWithoutContent); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java new file mode 100644 index 00000000..02ff8d58 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceHelperTests.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; + +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; +import org.opensearch.index.IndexNotFoundException; +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.test.rest.RestActionTestCase; + +public class DatasourceHelperTests extends RestActionTestCase { + + public void testUpdateDatasource() throws Exception { + Instant previousTime = Instant.now().minusMillis(1); + Datasource datasource = new Datasource( + "testId", + previousTime, + null, + false, + null, + null, + DatasourceState.PREPARING, + null, + null, + null + ); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof IndexRequest); + IndexRequest request = (IndexRequest) actionRequest; + assertEquals(datasource.getId(), request.id()); + assertEquals(DocWriteRequest.OpType.INDEX, request.opType()); + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + return null; + }); + + DatasourceHelper.updateDatasource(verifyingClient, datasource, TimeValue.timeValueSeconds(30)); + assertTrue(previousTime.isBefore(datasource.getLastUpdateTime())); + } + + public void testGetDatasourceException() throws Exception { + Datasource datasource = new Datasource( + "testId", + Instant.now(), + null, + false, + new IntervalSchedule(Instant.now(), 1, ChronoUnit.DAYS), + "https://test.com", + DatasourceState.PREPARING, + null, + null, + null + ); + + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof GetRequest); + GetRequest request = (GetRequest) actionRequest; + assertEquals(datasource.getId(), request.id()); + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + throw new IndexNotFoundException(DatasourceExtension.JOB_INDEX_NAME); + }); + + assertNull(DatasourceHelper.getDatasource(verifyingClient, datasource.getId(), TimeValue.timeValueSeconds(30))); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java new file mode 100644 index 00000000..6b65026d --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataHelperTests.java @@ -0,0 +1,22 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import org.opensearch.test.OpenSearchTestCase; + +public class GeoIpDataHelperTests extends OpenSearchTestCase { + public void testCreateDocument() { + String[] names = { "ip", "country", "city" }; + String[] values = { "1.0.0.0/25", "USA", "Seattle" }; + assertEquals( + "{\"_cidr\":\"1.0.0.0/25\",\"_data\":{\"country\":\"USA\",\"city\":\"Seattle\"}}", + GeoIpDataHelper.createDocument(names, values) + ); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettingsTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettingsTests.java new file mode 100644 index 00000000..451d0f8e --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/Ip2GeoSettingsTests.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.common; + +import org.opensearch.test.OpenSearchTestCase; + +public class Ip2GeoSettingsTests extends OpenSearchTestCase { + public void testValidateInvalidUrl() { + Ip2GeoSettings.DatasourceEndpointValidator validator = new Ip2GeoSettings.DatasourceEndpointValidator(); + Exception e = expectThrows(IllegalArgumentException.class, () -> validator.validate("InvalidUrl")); + assertEquals("Invalid URL format is provided", e.getMessage()); + } + + public void testValidateValidUrl() { + Ip2GeoSettings.DatasourceEndpointValidator validator = new Ip2GeoSettings.DatasourceEndpointValidator(); + validator.validate("https://test.com"); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java new file mode 100644 index 00000000..8c543da2 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceTests.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.jobscheduler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.geospatial.plugin.GeospatialPlugin.IP2GEO_DATASOURCE_INDEX_NAME_PREFIX; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.Locale; + +import org.opensearch.geospatial.ip2geo.common.DatasourceManifest; +import org.opensearch.test.OpenSearchTestCase; + +public class DatasourceTests extends OpenSearchTestCase { + public void testCurrentIndexName() { + String id = "test"; + Instant now = Instant.now(); + Datasource datasource = new Datasource(); + datasource.setId(id); + datasource.setDatabase(new Datasource.Database("provider", "md5Hash", now, 10l, new ArrayList<>())); + assertEquals( + String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATASOURCE_INDEX_NAME_PREFIX, id, now.toEpochMilli()), + datasource.currentIndexName() + ); + } + + public void testGetIndexNameFor() { + long updatedAt = 123123123l; + DatasourceManifest manifest = mock(DatasourceManifest.class); + when(manifest.getUpdatedAt()).thenReturn(updatedAt); + + String id = "test"; + Datasource datasource = new Datasource(); + datasource.setId(id); + assertEquals( + String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATASOURCE_INDEX_NAME_PREFIX, id, updatedAt), + datasource.indexNameFor(manifest) + ); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCacheTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCacheTests.java new file mode 100644 index 00000000..ad340f47 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoCacheTests.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.geospatial.ip2geo.processor; + +import java.util.HashMap; +import java.util.Map; + +import org.opensearch.OpenSearchException; +import org.opensearch.test.OpenSearchTestCase; + +public class Ip2GeoCacheTests extends OpenSearchTestCase { + public void testCachesAndEvictsResults() { + Ip2GeoCache cache = new Ip2GeoCache(1); + String datasource = "datasource"; + Map response1 = new HashMap<>(); + Map response2 = new HashMap<>(); + assertNotSame(response1, response2); + + // add a key + Map cachedResponse = cache.putIfAbsent("127.0.0.1", datasource, key -> response1); + assertSame(cachedResponse, response1); + assertSame(cachedResponse, cache.putIfAbsent("127.0.0.1", datasource, key -> response2)); + assertSame(cachedResponse, cache.get("127.0.0.1", datasource)); + + // evict old key by adding another value + cachedResponse = cache.putIfAbsent("127.0.0.2", datasource, key -> response2); + assertSame(cachedResponse, response2); + assertSame(cachedResponse, cache.putIfAbsent("127.0.0.2", datasource, ip -> response2)); + assertSame(cachedResponse, cache.get("127.0.0.2", datasource)); + + assertNotSame(response1, cache.get("127.0.0.1", datasource)); + } + + public void testThrowsFunctionsException() { + Ip2GeoCache cache = new Ip2GeoCache(1); + expectThrows( + OpenSearchException.class, + () -> cache.putIfAbsent("127.0.0.1", "datasource", ip -> { throw new OpenSearchException("bad"); }) + ); + } + + public void testNoExceptionForNullValue() { + Ip2GeoCache cache = new Ip2GeoCache(1); + Map response = cache.putIfAbsent("127.0.0.1", "datasource", ip -> null); + assertTrue(response.isEmpty()); + } + + public void testInvalidInit() { + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> new Ip2GeoCache(-1)); + assertEquals("ip2geo max cache size must be 0 or greater", ex.getMessage()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index 1a775955..85a90cda 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -5,25 +5,53 @@ package org.opensearch.geospatial.plugin; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashSet; import java.util.List; import java.util.Map; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; +import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceAction; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.processor.FeatureProcessor; import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction; import org.opensearch.geospatial.stats.upload.RestUploadStatsAction; +import org.opensearch.ingest.IngestService; import org.opensearch.ingest.Processor; import org.opensearch.plugins.ActionPlugin; import org.opensearch.plugins.IngestPlugin; import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; import org.opensearch.test.OpenSearchTestCase; public class GeospatialPluginTests extends OpenSearchTestCase { + private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet(Ip2GeoSettings.settings())); + private final List SUPPORTED_REST_HANDLERS = List.of( + new RestUploadGeoJSONAction(), + new RestUploadStatsAction(), + new RestPutDatasourceAction(Settings.EMPTY, clusterSettings) + ); + private final Client client; + private final ClusterService clusterService; + private final IngestService ingestService; - private final List SUPPORTED_REST_HANDLERS = List.of(new RestUploadGeoJSONAction(), new RestUploadStatsAction()); + public GeospatialPluginTests() { + client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + clusterService = mock(ClusterService.class); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + ingestService = mock(IngestService.class); + when(ingestService.getClusterService()).thenReturn(clusterService); + } public void testIsAnIngestPlugin() { GeospatialPlugin plugin = new GeospatialPlugin(); @@ -31,15 +59,30 @@ public void testIsAnIngestPlugin() { } public void testFeatureProcessorIsAdded() { + Processor.Parameters parameters = new Processor.Parameters( + mock(Environment.class), + mock(ScriptService.class), + null, + null, + null, + null, + ingestService, + client, + null + ); + GeospatialPlugin plugin = new GeospatialPlugin(); - Map processors = plugin.getProcessors(null); + Map processors = plugin.getProcessors(parameters); assertTrue(processors.containsKey(FeatureProcessor.TYPE)); assertTrue(processors.get(FeatureProcessor.TYPE) instanceof FeatureProcessor.Factory); } public void testTotalRestHandlers() { GeospatialPlugin plugin = new GeospatialPlugin(); - assertEquals(SUPPORTED_REST_HANDLERS.size(), plugin.getRestHandlers(Settings.EMPTY, null, null, null, null, null, null).size()); + assertEquals( + SUPPORTED_REST_HANDLERS.size(), + plugin.getRestHandlers(Settings.EMPTY, null, clusterSettings, null, null, null, null).size() + ); } public void testUploadGeoJSONTransportIsAdded() { diff --git a/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml b/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml index 21c13a60..8468a6c3 100644 --- a/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml +++ b/src/yamlRestTest/resources/rest-api-spec/test/10_basic.yml @@ -5,4 +5,4 @@ h: component - match: - $body: /^opensearch-geospatial\n$/ + $body: /^opensearch-geospatial\nopensearch-job-scheduler\n$/