Skip to content

Commit

Permalink
Added unit tests with some refactoring of codes (#271)
Browse files Browse the repository at this point in the history
* Add Unit tests
* Set cache true for search query
* Remove in memory cache implementation (Two way door decision)
 * Relying on search cache without custom cache
* Renamed datasource state from FAILED to CREATE_FAILED
* Renamed class name from *Helper to *Facade
* Changed updateIntervalInDays to updateInterval
* Changed value type of default update_interval from TimeValue to Long
* Read setting value from cluster settings directly

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 authored May 3, 2023
1 parent 4b29b4e commit 14230e8
Show file tree
Hide file tree
Showing 38 changed files with 2,451 additions and 878 deletions.
Empty file added lombok.config
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.geospatial.annotation;

public @interface VisibleForTesting {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.net.URL;
import java.util.Locale;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
Expand All @@ -28,11 +29,12 @@
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;

/**
* GeoIP datasource creation request
* Ip2Geo datasource creation request
*/
@Getter
@Setter
@Log4j2
@EqualsAndHashCode
public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceRequest> {
private static final ParseField ENDPOINT_FIELD = new ParseField("endpoint");
private static final ParseField UPDATE_INTERVAL_IN_DAYS_FIELD = new ParseField("update_interval_in_days");
Expand All @@ -47,10 +49,10 @@ public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceReque
*/
private String endpoint;
/**
* @param updateIntervalInDays update interval of a datasource
* @param updateInterval update interval of a datasource
* @return update interval of a datasource
*/
private TimeValue updateIntervalInDays;
private TimeValue updateInterval;

/**
* Parser of a datasource
Expand All @@ -59,7 +61,7 @@ public class PutDatasourceRequest extends AcknowledgedRequest<PutDatasourceReque
static {
PARSER = new ObjectParser<>("put_datasource");
PARSER.declareString((request, val) -> request.setEndpoint(val), ENDPOINT_FIELD);
PARSER.declareLong((request, val) -> request.setUpdateIntervalInDays(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD);
PARSER.declareLong((request, val) -> request.setUpdateInterval(TimeValue.timeValueDays(val)), UPDATE_INTERVAL_IN_DAYS_FIELD);
}

/**
Expand All @@ -79,15 +81,15 @@ public PutDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.datasourceName = in.readString();
this.endpoint = in.readString();
this.updateIntervalInDays = in.readTimeValue();
this.updateInterval = in.readTimeValue();
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(datasourceName);
out.writeString(endpoint);
out.writeTimeValue(updateIntervalInDays);
out.writeTimeValue(updateInterval);
}

@Override
Expand Down Expand Up @@ -120,7 +122,7 @@ private void validateEndpoint(final ActionRequestValidationException errors) {
* Conduct following validation on url
* 1. can read manifest file from the endpoint
* 2. the url in the manifest file complies with RFC-2396
* 3. updateIntervalInDays is less than validForInDays value in the manifest file
* 3. updateInterval is less than validForInDays value in the manifest file
*
* @param url the url to validate
* @param errors the errors to add error messages
Expand All @@ -143,25 +145,25 @@ private void validateManifestFile(final URL url, final ActionRequestValidationEx
return;
}

if (updateIntervalInDays.days() >= manifest.getValidForInDays()) {
if (updateInterval.days() >= manifest.getValidForInDays()) {
errors.addValidationError(
String.format(
Locale.ROOT,
"updateInterval %d is should be smaller than %d",
updateIntervalInDays.days(),
"updateInterval %d should be smaller than %d",
updateInterval.days(),
manifest.getValidForInDays()
)
);
}
}

/**
* Validate updateIntervalInDays is larger than 0
* Validate updateInterval is equal or larger than 1
*
* @param errors the errors to add error messages
*/
private void validateUpdateInterval(final ActionRequestValidationException errors) {
if (updateIntervalInDays.compareTo(TimeValue.timeValueDays(1)) > 0) {
if (updateInterval.compareTo(TimeValue.timeValueDays(1)) < 0) {
errors.addValidationError("Update interval should be equal to or larger than 1 day");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,10 @@

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;

import lombok.extern.log4j.Log4j2;

import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
Expand All @@ -28,19 +22,14 @@
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.geospatial.ip2geo.common.DatasourceHelper;
import org.opensearch.geospatial.ip2geo.common.DatasourceManifest;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataHelper;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -52,138 +41,96 @@
@Log4j2
public class PutDatasourceTransportAction extends HandledTransportAction<PutDatasourceRequest, AcknowledgedResponse> {
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

private TimeValue timeout;
private int indexingBulkSize;
private final DatasourceFacade datasourceFacade;
private final DatasourceUpdateService datasourceUpdateService;

/**
* Default constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param client the client
* @param clusterService the cluster service
* @param threadPool the thread pool
* @param settings the settings
* @param clusterSettings the cluster settings
*/
@Inject
public PutDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final Client client,
final ClusterService clusterService,
final ThreadPool threadPool,
final Settings settings,
final ClusterSettings clusterSettings
final DatasourceFacade datasourceFacade,
final DatasourceUpdateService datasourceUpdateService
) {
super(PutDatasourceAction.NAME, transportService, actionFilters, PutDatasourceRequest::new);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
timeout = Ip2GeoSettings.TIMEOUT_IN_SECONDS.get(settings);
clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.TIMEOUT_IN_SECONDS, newValue -> timeout = newValue);
indexingBulkSize = Ip2GeoSettings.INDEXING_BULK_SIZE.get(settings);
clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.INDEXING_BULK_SIZE, newValue -> indexingBulkSize = newValue);
this.datasourceFacade = datasourceFacade;
this.datasourceUpdateService = datasourceUpdateService;
}

@Override
protected void doExecute(final Task task, final PutDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
try {
Datasource jobParameter = Datasource.Builder.build(request);
Datasource datasource = Datasource.Builder.build(request);
IndexRequest indexRequest = new IndexRequest().index(DatasourceExtension.JOB_INDEX_NAME)
.id(jobParameter.getId())
.source(jobParameter.toXContent(JsonXContent.contentBuilder(), null))
.id(datasource.getId())
.source(datasource.toXContent(JsonXContent.contentBuilder(), null))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.opType(DocWriteRequest.OpType.CREATE);
client.index(indexRequest, new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> {
try {
createDatasource(jobParameter);
} catch (Exception e) {
log.error("Failed to create datasource for {}", jobParameter.getId(), e);
jobParameter.getUpdateStats().setLastFailedAt(Instant.now());
jobParameter.setState(DatasourceState.FAILED);
try {
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
} catch (Exception ex) {
log.error("Failed to mark datasource state as FAILED for {}", jobParameter.getId(), ex);
}
}
});
listener.onResponse(new AcknowledgedResponse(true));
}

@Override
public void onFailure(final Exception e) {
if (e instanceof VersionConflictEngineException) {
listener.onFailure(
new ResourceAlreadyExistsException("datasource [{}] already exists", request.getDatasourceName())
);
} else {
listener.onFailure(e);
}
}
});
client.index(indexRequest, getIndexResponseListener(datasource, listener));
} catch (Exception e) {
listener.onFailure(e);
}
}

private void createDatasource(final Datasource jobParameter) throws Exception {
if (!DatasourceState.PREPARING.equals(jobParameter.getState())) {
log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.AVAILABLE, jobParameter.getState());
jobParameter.setState(DatasourceState.FAILED);
jobParameter.getUpdateStats().setLastFailedAt(Instant.now());
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
return;
}
@VisibleForTesting
protected ActionListener<IndexResponse> getIndexResponseListener(
final Datasource datasource,
final ActionListener<AcknowledgedResponse> listener
) {
return new ActionListener<>() {
@Override
public void onResponse(final IndexResponse indexResponse) {
// This is user initiated request. Therefore, we want to handle the first datasource update task in a generic thread
// pool.
threadPool.generic().submit(() -> { createDatasource(datasource); });
listener.onResponse(new AcknowledgedResponse(true));
}

URL url = new URL(jobParameter.getEndpoint());
DatasourceManifest manifest = DatasourceManifest.Builder.build(url);
String indexName = setupIndex(manifest, jobParameter);
Instant startTime = Instant.now();
String[] fields = putIp2GeoData(indexName, manifest);
Instant endTime = Instant.now();
updateJobParameterAsSucceeded(jobParameter, manifest, fields, startTime, endTime);
log.info("GeoIP database[{}] creation succeeded after {} seconds", jobParameter.getId(), Duration.between(startTime, endTime));
@Override
public void onFailure(final Exception e) {
if (e instanceof VersionConflictEngineException) {
listener.onFailure(new ResourceAlreadyExistsException("datasource [{}] already exists", datasource.getId()));
} else {
listener.onFailure(e);
}
}
};
}

private void updateJobParameterAsSucceeded(
final Datasource jobParameter,
final DatasourceManifest manifest,
final String[] fields,
final Instant startTime,
final Instant endTime
) throws IOException {
jobParameter.setDatabase(manifest, fields);
jobParameter.getUpdateStats().setLastSucceededAt(endTime);
jobParameter.getUpdateStats().setLastProcessingTimeInMillis(endTime.toEpochMilli() - startTime.toEpochMilli());
jobParameter.enable();
jobParameter.setState(DatasourceState.AVAILABLE);
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
}
@VisibleForTesting
protected void createDatasource(final Datasource datasource) {
if (DatasourceState.CREATING.equals(datasource.getState()) == false) {
log.error("Invalid datasource state. Expecting {} but received {}", DatasourceState.CREATING, datasource.getState());
markDatasourceAsCreateFailed(datasource);
return;
}

private String setupIndex(final DatasourceManifest manifest, final Datasource jobParameter) throws IOException {
String indexName = jobParameter.indexNameFor(manifest);
jobParameter.getIndices().add(indexName);
DatasourceHelper.updateDatasource(client, jobParameter, timeout);
GeoIpDataHelper.createIndexIfNotExists(clusterService, client, indexName, timeout);
return indexName;
try {
datasourceUpdateService.updateOrCreateGeoIpData(datasource);
} catch (Exception e) {
log.error("Failed to create datasource for {}", datasource.getId(), e);
markDatasourceAsCreateFailed(datasource);
}
}

private String[] putIp2GeoData(final String indexName, final DatasourceManifest manifest) throws IOException {
String[] fields;
try (CSVParser reader = GeoIpDataHelper.getDatabaseReader(manifest)) {
Iterator<CSVRecord> iter = reader.iterator();
fields = iter.next().values();
GeoIpDataHelper.putGeoData(client, indexName, fields, iter, indexingBulkSize, timeout);
private void markDatasourceAsCreateFailed(final Datasource datasource) {
datasource.getUpdateStats().setLastFailedAt(Instant.now());
datasource.setState(DatasourceState.CREATE_FAILED);
try {
datasourceFacade.updateDatasource(datasource);
} catch (Exception e) {
log.error("Failed to mark datasource state as CREATE_FAILED for {}", datasource.getId(), e);
}
return fields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import org.opensearch.client.node.NodeClient;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
Expand All @@ -41,14 +40,10 @@
*/
public class RestPutDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource";
private String defaultDatasourceEndpoint;
private TimeValue defaultUpdateInterval;
private final ClusterSettings clusterSettings;

public RestPutDatasourceHandler(final Settings settings, final ClusterSettings clusterSettings) {
defaultDatasourceEndpoint = Ip2GeoSettings.DATASOURCE_ENDPOINT.get(settings);
clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.DATASOURCE_ENDPOINT, newValue -> defaultDatasourceEndpoint = newValue);
defaultUpdateInterval = Ip2GeoSettings.DATASOURCE_UPDATE_INTERVAL.get(settings);
clusterSettings.addSettingsUpdateConsumer(Ip2GeoSettings.DATASOURCE_UPDATE_INTERVAL, newValue -> defaultUpdateInterval = newValue);
public RestPutDatasourceHandler(final ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
}

@Override
Expand All @@ -65,10 +60,10 @@ protected RestChannelConsumer prepareRequest(final RestRequest request, final No
}
}
if (putDatasourceRequest.getEndpoint() == null) {
putDatasourceRequest.setEndpoint(defaultDatasourceEndpoint);
putDatasourceRequest.setEndpoint(clusterSettings.get(Ip2GeoSettings.DATASOURCE_ENDPOINT));
}
if (putDatasourceRequest.getUpdateIntervalInDays() == null) {
putDatasourceRequest.setUpdateIntervalInDays(defaultUpdateInterval);
if (putDatasourceRequest.getUpdateInterval() == null) {
putDatasourceRequest.setUpdateInterval(TimeValue.timeValueDays(clusterSettings.get(Ip2GeoSettings.DATASOURCE_UPDATE_INTERVAL)));
}
return channel -> client.executeLocally(PutDatasourceAction.INSTANCE, putDatasourceRequest, new RestToXContentListener<>(channel));
}
Expand Down
Loading

0 comments on commit 14230e8

Please sign in to comment.