Skip to content

Commit

Permalink
IP2Geo processor implementation (#362)
Browse files Browse the repository at this point in the history
* Implement creation of ip2geo feature (#257)

* Update gradle version to 7.6 (#265)

Signed-off-by: Vijayan Balasubramanian <[email protected]>

* Implement creation of ip2geo feature

* Implementation of ip2geo datasource creation
* Implementation of ip2geo processor creation

Signed-off-by: Heemin Kim <[email protected]>
---------

Signed-off-by: Vijayan Balasubramanian <[email protected]>
Signed-off-by: Heemin Kim <[email protected]>
Co-authored-by: Vijayan Balasubramanian <[email protected]>

* Added unit tests with some refactoring of codes (#271)

* Add Unit tests
* Set cache true for search query
* Remove in memory cache implementation (Two way door decision)
 * Relying on search cache without custom cache
* Renamed datasource state from FAILED to CREATE_FAILED
* Renamed class name from *Helper to *Facade
* Changed updateIntervalInDays to updateInterval
* Changed value type of default update_interval from TimeValue to Long
* Read setting value from cluster settings directly

Signed-off-by: Heemin Kim <[email protected]>

* Sync from main (#280)

* Update gradle version to 7.6 (#265)

Signed-off-by: Vijayan Balasubramanian <[email protected]>

* Exclude lombok generated code from jacoco coverage report (#268)

Signed-off-by: Heemin Kim <[email protected]>

* Make jacoco report to be generated faster in local (#267)

Signed-off-by: Heemin Kim <[email protected]>

* Update dependency org.json:json to v20230227 (#273)

Co-authored-by: mend-for-github.aaakk.us.kg[bot] <50673670+mend-for-github.aaakk.us.kg[bot]@users.noreply.github.com>

* Baseline owners and maintainers (#275)

Signed-off-by: Vijayan Balasubramanian <[email protected]>

---------

Signed-off-by: Vijayan Balasubramanian <[email protected]>
Signed-off-by: Heemin Kim <[email protected]>
Co-authored-by: Vijayan Balasubramanian <[email protected]>
Co-authored-by: mend-for-github.aaakk.us.kg[bot] <50673670+mend-for-github.aaakk.us.kg[bot]@users.noreply.github.com>

* Add datasource name validation (#281)

Signed-off-by: Heemin Kim <[email protected]>

* Refactoring of code (#282)

1. Change variable name from datasourceName to name
2. Change variable name from id to name
3. Added helper methods in test code

Signed-off-by: Heemin Kim <[email protected]>

* Change field name from md5 to sha256 (#285)

Signed-off-by: Heemin Kim <[email protected]>

* Implement get datasource api (#279)

Signed-off-by: Heemin Kim <[email protected]>

* Update index option (#284)

1. Make geodata index as hidden
2. Make geodata index as read only allow delete after creation is done
3. Refresh datasource index immediately after update

Signed-off-by: Heemin Kim <[email protected]>

* Make some fields in manifest file as mandatory (#289)

Signed-off-by: Heemin Kim <[email protected]>

* Create datasource index explicitly (#283)

Signed-off-by: Heemin Kim <[email protected]>

* Add wrapper class of job scheduler lock service (#290)

Signed-off-by: Heemin Kim <[email protected]>

* Remove all unused client attributes (#293)

Signed-off-by: Heemin Kim <[email protected]>

* Update copyright header (#298)

Signed-off-by: Heemin Kim <[email protected]>

* Run system index handling code with stashed thread context (#297)

Signed-off-by: Heemin Kim <[email protected]>

* Reduce lock duration and renew the lock during update (#299)

Signed-off-by: Heemin Kim <[email protected]>

* Implements delete datasource API (#291)

Signed-off-by: Heemin Kim <[email protected]>

* Set User-Agent in http request (#300)

Signed-off-by: Heemin Kim <[email protected]>

* Implement datasource update API (#292)

Signed-off-by: Heemin Kim <[email protected]>

* Refactoring test code (#302)

Make buildGeoJSONFeatureProcessorConfig method to be more general

Signed-off-by: Heemin Kim <[email protected]>

* Add ip2geo processor integ test for failure case (#303)

Signed-off-by: Heemin Kim <[email protected]>

* Bug fix and refactoring of code (#305)

1. Bugfix: Ingest metadata can be null if there is no processor created
2. Refactoring: Moved private method to another class for better testing support
3. Refactoring: Set some private static final variable as public so that unit test can use it
4. Refactoring: Changed string value to static variable

Signed-off-by: Heemin Kim <[email protected]>

* Add integration test for Ip2GeoProcessor (#306)

Signed-off-by: Heemin Kim <[email protected]>

* Add ConcurrentModificationException (#308)

Signed-off-by: Heemin Kim <[email protected]>

* Add integration test for UpdateDatasource API (#307)

Signed-off-by: Heemin Kim <[email protected]>

* Bug fix on lock management and few performance improvements (#310)

* Release lock before response back to caller for update/delete API
* Release lock in background task for creation API
* Change index settings to improve indexing performance

Signed-off-by: Heemin Kim <[email protected]>

* Change index setting from read_only_allow_delete to write (#311)

read_only_allow_delete does not block write to an index.
The disk-based shard allocator may add and remove this block automatically.
Therefore, use index.blocks.write instead.

Signed-off-by: Heemin Kim <[email protected]>

* Fix bug in get datasource API and improve memory usage (#313)

Signed-off-by: Heemin Kim <[email protected]>

* Change package for Strings.hasText (#314) (#317)

Signed-off-by: Heemin Kim <[email protected]>

* Remove jitter and move index setting from DatasourceFacade to DatasourceExtension (#319)

Signed-off-by: Heemin Kim <[email protected]>

* Do not index blank value and do not enrich null property (#320)

Signed-off-by: Heemin Kim <[email protected]>

* Move index setting keys to constants (#321)

Signed-off-by: Heemin Kim <[email protected]>

* Return null index name for expired data (#322)

Return null index name for expired data so that it can be deleted
by clean up process. Clean up process exclude current index from deleting.
Signed-off-by: Heemin Kim <[email protected]>

* Add new fields in datasource (#325)

Signed-off-by: Heemin Kim <[email protected]>

* Delete index once it is expired (#326)

Signed-off-by: Heemin Kim <[email protected]>

* Add restoring event listener (#328)

In the listener, we trigger a geoip data update

Signed-off-by: Heemin Kim <[email protected]>

* Reverse forcemerge and refresh order (#331)

Otherwise, opensearch does not clear old segment files

Signed-off-by: Heemin Kim <[email protected]>

* Removed parameter and settings (#332)

* Removed first_only parameter
* Removed max_concurrency and batch_size setting

first_only parameter was added as current geoip processor has it.
However, the parameter have no benefit for ip2geo processor as we don't do a sequantial search for array data but use multi search.

max_concurrency and batch_size setting is removed as these are only reveal internal implementation and could be a future blocker to improve performance later.

Signed-off-by: Heemin Kim <[email protected]>

* Add a field in datasource for current index name (#333)

Signed-off-by: Heemin Kim <[email protected]>

* Delete GeoIP data indices after restoring complete (#334)

We don't want to use restored GeoIP data indices. Therefore we
delete the indices once restoring process complete.

When GeoIP metadata index is restored, we create a new GeoIP data index instead.

Signed-off-by: Heemin Kim <[email protected]>

* Use bool query for array form of IPs (#335)

Signed-off-by: Heemin Kim <[email protected]>

* Run update/delete request in a new thread (#337)

This is not to block transport thread

Signed-off-by: Heemin Kim <[email protected]>

* Remove IP2Geo processor validation (#336)

Cannot query index to get data to validate IP2Geo processor.
Will add validation when we decide to store some of data in cluster state metadata.

Signed-off-by: Heemin Kim <[email protected]>

* Acquire lock sychronously (#339)

By acquiring lock asychronously, the remaining part of the code
is being run by transport thread which does not allow blocking code.
We want only single update happen in a node using single thread. However,
it cannot be acheived if I acquire lock asynchronously and pass the listener.

Signed-off-by: Heemin Kim <[email protected]>

* Added a cache to store datasource metadata (#338)

Signed-off-by: Heemin Kim <[email protected]>

* Changed class name and package (#341)

Signed-off-by: Heemin Kim <[email protected]>

* Refactoring of code (#342)

1. Changed class name from Ip2GeoCache to Ip2GeoCachedDao
2. Moved the Ip2GeoCachedDao from cache to dao package

Signed-off-by: Heemin Kim <[email protected]>

* Add geo data cache (#340)

Signed-off-by: Heemin Kim <[email protected]>

* Add cache layer to reduce GeoIp data retrieval latency (#343)

Signed-off-by: Heemin Kim <[email protected]>

* Use _primary in query preference and few changes (#347)

1. Use _primary preference to get datasource metadata so that it can read the latest data. RefreshPolicy.IMMEDIATE won't refresh replica shards immediately according to #346
2. Update datasource metadata index mapping
3. Move batch size from static value to setting

Signed-off-by: Heemin Kim <[email protected]>

* Wait until GeoIP data to be replicated to all data nodes (#348)

Signed-off-by: Heemin Kim <[email protected]>

* Update packages according to a change in OpenSearch core (#354)

* Update packages according to a change in OpenSearch core

Signed-off-by: Heemin Kim <[email protected]>

* Update packages according to a change in OpenSearch core (#353)

Signed-off-by: Heemin Kim <[email protected]>

---------

Signed-off-by: Heemin Kim <[email protected]>

---------

Signed-off-by: Vijayan Balasubramanian <[email protected]>
Signed-off-by: Heemin Kim <[email protected]>
Co-authored-by: Vijayan Balasubramanian <[email protected]>
Co-authored-by: mend-for-github.aaakk.us.kg[bot] <50673670+mend-for-github.aaakk.us.kg[bot]@users.noreply.github.com>
(cherry picked from commit 0cd9153)
  • Loading branch information
heemin32 authored and github-actions[bot] committed Jul 24, 2023
1 parent 64cd7b7 commit 8a9302f
Show file tree
Hide file tree
Showing 96 changed files with 10,418 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Enhancements
### Bug Fixes
### Infrastructure
* Make jacoco report to be generated faster in local ([#267](https://github.com/opensearch-project/geospatial/pull/267))
* Exclude lombok generated code from jacoco coverage report ([#268](https://github.com/opensearch-project/geospatial/pull/268))
### Documentation
### Maintenance
* Change package for Strings.hasText ([#314](https://github.com/opensearch-project/geospatial/pull/314))
Expand Down
55 changes: 53 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import org.opensearch.gradle.test.RestIntegTestTask

import java.util.concurrent.Callable

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'opensearch.opensearchplugin'
Expand Down Expand Up @@ -35,6 +37,7 @@ opensearchplugin {
classname "${projectPath}.${pathToPlugin}.${pluginClassName}"
licenseFile rootProject.file('LICENSE')
noticeFile rootProject.file('NOTICE')
extendedPlugins = ['opensearch-job-scheduler']
}

// This requires an additional Jar not published as part of build-tools
Expand Down Expand Up @@ -142,6 +145,10 @@ publishing {
}


configurations {
zipArchive
}

//****************************************************************************/
// Dependencies
//****************************************************************************/
Expand All @@ -154,6 +161,9 @@ dependencies {
implementation "org.apache.commons:commons-lang3:3.12.0"
implementation "org.locationtech.spatial4j:spatial4j:${versions.spatial4j}"
implementation "org.locationtech.jts:jts-core:${versions.jts}"
implementation "org.apache.commons:commons-csv:1.10.0"
zipArchive group: 'org.opensearch.plugin', name:'opensearch-job-scheduler', version: "${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
}

licenseHeaders.enabled = true
Expand Down Expand Up @@ -206,8 +216,6 @@ integTest {
testClusters.integTest {
testDistribution = "ARCHIVE"

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
Expand All @@ -220,6 +228,49 @@ testClusters.integTest {
debugPort += 1
}
}

// This installs our plugin into the testClusters
plugin(project.tasks.bundlePlugin.archiveFile)
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
}
}
}))

// opensearch-geospatial plugin is being added to the list of plugins for the testCluster during build before
// the opensearch-job-scheduler plugin, which is causing build failures. From the stack trace, this looks like a bug.
//
// Exception in thread "main" java.lang.IllegalArgumentException: Missing plugin [opensearch-job-scheduler], dependency of [opensearch-geospatial]
// at org.opensearch.plugins.PluginsService.addSortedBundle(PluginsService.java:515)
//
// A temporary hack is to reorder the plugins list after evaluation but prior to task execution when the plugins are installed.
// See https://github.com/opensearch-project/anomaly-detection/blob/fd547014fdde5114bbc9c8e49fe7aaa37eb6e793/build.gradle#L400-L422
nodes.each { node ->
def plugins = node.plugins
def firstPlugin = plugins.get(0)
plugins.remove(0)
plugins.add(firstPlugin)
}
}

testClusters.yamlRestTest {
plugin(provider(new Callable<RegularFile>(){
@Override
RegularFile call() throws Exception {
return new RegularFile() {
@Override
File getAsFile() {
return configurations.zipArchive.asFileTree.getSingleFile()
}
}
}
}))
}

run {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.annotation;

public @interface VisibleForTesting {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.constants;

/**
* Collection of keys for index setting
*/
public class IndexSetting {
public static final String NUMBER_OF_SHARDS = "index.number_of_shards";
public static final String NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final String REFRESH_INTERVAL = "index.refresh_interval";
public static final String AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final String HIDDEN = "index.hidden";
public static final String BLOCKS_WRITE = "index.blocks.write";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.rest.RestStatus;

/**
* General ConcurrentModificationException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*
* The exception is thrown when multiple mutation API is called for a same resource at the same time
*/
public class ConcurrentModificationException extends OpenSearchException {

public ConcurrentModificationException(String msg, Object... args) {
super(msg, args);
}

public ConcurrentModificationException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ConcurrentModificationException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.rest.RestStatus;

/**
* IncompatibleDatasourceException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*
* The exception is thrown when a user tries to update datasource with new endpoint which is not compatible
* with current datasource
*/
public class IncompatibleDatasourceException extends OpenSearchException {

public IncompatibleDatasourceException(String msg, Object... args) {
super(msg, args);
}

public IncompatibleDatasourceException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public IncompatibleDatasourceException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.exceptions;

import java.io.IOException;

import org.opensearch.OpenSearchException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.rest.RestStatus;

/**
* Generic ResourceInUseException corresponding to the {@link RestStatus#BAD_REQUEST} status code
*/
public class ResourceInUseException extends OpenSearchException {

public ResourceInUseException(String msg, Object... args) {
super(msg, args);
}

public ResourceInUseException(String msg, Throwable cause, Object... args) {
super(msg, cause, args);
}

public ResourceInUseException(StreamInput in) throws IOException {
super(in);
}

@Override
public final RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Ip2Geo datasource delete action
*/
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Delete datasource action instance
*/
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction();
/**
* Delete datasource action name
*/
public static final String NAME = "cluster:admin/geospatial/datasource/delete";

private DeleteDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

/**
* GeoIP datasource delete request
*/
@Getter
@Setter
@AllArgsConstructor
public class DeleteDatasourceRequest extends ActionRequest {
/**
* @param name the datasource name
* @return the datasource name
*/
private String name;

/**
* Constructor
*
* @param in the stream input
* @throws IOException IOException
*/
public DeleteDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (name == null || name.isBlank()) {
errors = new ActionRequestValidationException();
errors.addValidationError("Datasource name should not be empty");
}
return errors;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
}
Loading

0 comments on commit 8a9302f

Please sign in to comment.