Skip to content

Commit

Permalink
Implement creation of ip2geo feature
Browse files Browse the repository at this point in the history
* Implementation of ip2geo datasource creation
* Implementation of ip2geo processor creation

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Apr 17, 2023
1 parent e594bf3 commit 0932e92
Show file tree
Hide file tree
Showing 28 changed files with 3,102 additions and 11 deletions.
57 changes: 55 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import org.opensearch.gradle.test.RestIntegTestTask

import java.util.concurrent.Callable

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'opensearch.opensearchplugin'
Expand Down Expand Up @@ -35,6 +37,7 @@ opensearchplugin {
classname "${projectPath}.${pathToPlugin}.${pluginClassName}"
licenseFile rootProject.file('LICENSE')
noticeFile rootProject.file('NOTICE')
extendedPlugins = ['opensearch-job-scheduler']
}

// This requires an additional Jar not published as part of build-tools
Expand Down Expand Up @@ -142,6 +145,10 @@ publishing {
}


configurations {
zipArchive
}

//****************************************************************************/
// Dependencies
//****************************************************************************/
Expand All @@ -154,6 +161,11 @@ dependencies {
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "org.locationtech.spatial4j:spatial4j:${versions.spatial4j}"
implementation "org.locationtech.jts:jts-core:${versions.jts}"
api("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
api("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}")
implementation "org.apache.commons:commons-csv:1.10.0"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
}

licenseHeaders.enabled = true
Expand Down Expand Up @@ -206,8 +218,6 @@ integTest {
testClusters.integTest {
testDistribution = "ARCHIVE"

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand All @@ -220,6 +230,49 @@ testClusters.integTest {
debugPort += 1
}
}

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
}
}
}))

// opensearch-geospatial plugin is being added to the list of plugins for the testCluster during build before
// the opensearch-job-scheduler plugin, which is causing build failures. From the stack trace, this looks like a bug.
//
// Exception in thread "main" java.lang.IllegalArgumentException: Missing plugin [opensearch-job-scheduler], dependency of [opensearch-geospatial]
// at org.opensearch.plugins.PluginsService.addSortedBundle(PluginsService.java:515)
//
// A temporary hack is to reorder the plugins list after evaluation but prior to task execution when the plugins are installed.
// See https://github.com/opensearch-project/anomaly-detection/blob/fd547014fdde5114bbc9c8e49fe7aaa37eb6e793/build.gradle#L400-L422
nodes.each { node ->
def plugins = node.plugins
def firstPlugin = plugins.get(0)
plugins.remove(0)
plugins.add(firstPlugin)
}
}

testClusters.yamlRestTest {
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
}
}
}))
}

run {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* GeoIp datasource creation action
*/
public class PutDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Put datasource action instance
*/
public static final PutDatasourceAction INSTANCE = new PutDatasourceAction();
/**
* Name of a put datasource action
*/
public static final String NAME = "cluster:admin/geospatial/datasource";

private PutDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.Locale;

import lombok.Getter;
import lombok.Setter;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.support.master.AcknowledgedRequest;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.ParseField;
import org.opensearch.core.xcontent.ObjectParser;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;

/**
* GeoIP datasource creation request
*/
@Getter
@Setter
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final Logger LOGGER = LogManager.getLogger(PutDatasourceRequest.class);
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
/**
* @param id Datasource name
* @return Datasource name
*/
private String id;
/**
* @param endpoint URL to a manifest file for a datasource
* @return URL to a manifest file for a datasource
*/
private String endpoint;
/**
* @param updateIntervalInDays Update interval of a datasource
* @return Update interval of a datasource
*/
private TimeValue updateIntervalInDays;

/**
* Parser of a datasource
*/
public static final ObjectParser<PutDatasourceRequest, Void> PARSER;
static {
PARSER = new ObjectParser<>("put_datasource");
PARSER.declareString((request, val) -> request.setEndpoint(val), ENDPOINT_FIELD);
PARSER.declareLong((request, val) -> request.setUpdateIntervalInDays(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD);
}

/**
* Default constructor
* @param id name of a datasource
*/
public PutDatasourceRequest(final String id) {
this.id = id;
}

/**
* Constructor with stream input
* @param in the stream input
* @throws IOException IOException
*/
public PutDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.id = in.readString();
this.endpoint = in.readString();
this.updateIntervalInDays = in.readTimeValue();
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(id);
out.writeString(endpoint);
out.writeTimeValue(updateIntervalInDays);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = new ActionRequestValidationException();
validateEndpoint(errors);
validateUpdateInterval(errors);
return errors.validationErrors().isEmpty() ? null : errors;
}

private void validateEndpoint(final ActionRequestValidationException errors) {
try {
URL url = new URL(endpoint);
url.toURI(); // Validate URL complies with RFC-2396
validateManifestFile(url, errors);
} catch (MalformedURLException | URISyntaxException e) {
LOGGER.info("Invalid URL format is provided", e);
errors.addValidationError("Invalid URL format is provided");
}
}

private void validateManifestFile(final URL url, final ActionRequestValidationException errors) {
try {
DatasourceManifest manifest = DatasourceManifest.Builder.build(url);
new URL(manifest.getUrl()).toURI(); // Validate URL complies with RFC-2396
if (manifest.getValidForInDays() <= updateIntervalInDays.days()) {
errors.addValidationError(
String.format(
Locale.ROOT,
"updateInterval %d is should be smaller than %d",
updateIntervalInDays.days(),
manifest.getValidForInDays()
)
);
}
} catch (MalformedURLException | URISyntaxException e) {
LOGGER.info("Invalid URL format is provided for url field in the manifest file", e);
errors.addValidationError("Invalid URL format is provided for url field in the manifest file");
} catch (Exception e) {
LOGGER.info("Error occurred while reading a file from {}", url, e);
errors.addValidationError(String.format(Locale.ROOT, "Error occurred while reading a file from %s", url));
}
}

private void validateUpdateInterval(final ActionRequestValidationException errors) {
if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) > 0) {
errors.addValidationError("Update interval should be equal to or larger than 1 day");
}
}
}
Loading

0 comments on commit 0932e92

Please sign in to comment.