Skip to content

Commit

Permalink
Add restoring event listener (opensearch-project#328)
Browse files Browse the repository at this point in the history
In the listener, we trigger a geoip data update

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jul 13, 2023
1 parent 0dfbe7d commit e7ac455
Show file tree
Hide file tree
Showing 9 changed files with 431 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -133,6 +136,33 @@ public IndexResponse updateDatasource(final Datasource datasource) {
});
}

/**
* Update datasources in an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param datasources the datasources
* @param listener action listener
*/
public void updateDatasource(final List<Datasource> datasources, final ActionListener<BulkResponse> listener) {
BulkRequest bulkRequest = new BulkRequest();
datasources.stream().map(datasource -> {
datasource.setLastUpdateTime(Instant.now());
return datasource;
}).map(this::toIndexRequest).forEach(indexRequest -> bulkRequest.add(indexRequest));
StashedThreadContext.run(client, () -> client.bulk(bulkRequest, listener));
}

private IndexRequest toIndexRequest(Datasource datasource) {
try {
IndexRequest indexRequest = new IndexRequest();
indexRequest.index(DatasourceExtension.JOB_INDEX_NAME);
indexRequest.id(datasource.getName());
indexRequest.opType(DocWriteRequest.OpType.INDEX);
indexRequest.source(datasource.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS));
return indexRequest;
} catch (IOException e) {
throw new RuntimeException(e);
}
}

/**
* Put datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,15 @@ public void disable() {
* @return Current index name of a datasource
*/
public String currentIndexName() {
return isExpired() ? null : indexNameFor(database.updatedAt.toEpochMilli());
if (isExpired()) {
return null;
}

if (database.updatedAt == null) {
return null;
}

return indexNameFor(database.updatedAt.toEpochMilli());
}

/**
Expand All @@ -371,6 +379,14 @@ private String indexNameFor(final long suffix) {
return String.format(Locale.ROOT, "%s.%s.%d", IP2GEO_DATA_INDEX_NAME_PREFIX, name, suffix);
}

/**
* Reset database so that it can be updated in next run regardless there is new update or not
*/
public void resetDatabase() {
database.setUpdatedAt(null);
database.setSha256Hash(null);
}

/**
* Checks if datasource is expired or not
*
Expand Down Expand Up @@ -537,7 +553,7 @@ public Database(final StreamInput in) throws IOException {
public void writeTo(final StreamOutput out) throws IOException {
out.writeOptionalString(provider);
out.writeOptionalString(sha256Hash);
out.writeOptionalVLong(updatedAt.toEpochMilli());
out.writeOptionalVLong(updatedAt == null ? null : updatedAt.toEpochMilli());
out.writeOptionalVLong(validForInDays);
out.writeOptionalStringCollection(fields);
}
Expand Down Expand Up @@ -640,10 +656,10 @@ public UpdateStats(final StreamInput in) throws IOException {

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeOptionalVLong(lastSucceededAt.toEpochMilli());
out.writeOptionalVLong(lastSucceededAt == null ? null : lastSucceededAt.toEpochMilli());
out.writeOptionalVLong(lastProcessingTimeInMillis);
out.writeOptionalVLong(lastFailedAt.toEpochMilli());
out.writeOptionalVLong(lastSkippedAt.toEpochMilli());
out.writeOptionalVLong(lastFailedAt == null ? null : lastFailedAt.toEpochMilli());
out.writeOptionalVLong(lastSkippedAt == null ? null : lastSkippedAt.toEpochMilli());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.listener;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.RestoreInProgress;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.threadpool.ThreadPool;

@Log4j2
@AllArgsConstructor(onConstructor = @__(@Inject))
public class Ip2GeoListener extends AbstractLifecycleComponent implements ClusterStateListener {
private static final int SCHEDULE_IN_MIN = 15;
private static final int DELAY_IN_MILLIS = 10000;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;

@Override
public void clusterChanged(final ClusterChangedEvent event) {
if (event.localNodeClusterManager() == false) {
return;
}

for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (RestoreInProgress.State.SUCCESS.equals(entry.state()) == false) {
continue;
}

if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) {
continue;
}

threadPool.generic().submit(() -> forceUpdateGeoIpData());
}
}

private void forceUpdateGeoIpData() {
datasourceFacade.getAllDatasources(new ActionListener<>() {
@Override
public void onResponse(final List<Datasource> datasources) {
datasources.stream().forEach(Ip2GeoListener.this::scheduleForceUpdate);
datasourceFacade.updateDatasource(datasources, new ActionListener<>() {
@Override
public void onResponse(final BulkResponse bulkItemResponses) {
log.info("Datasources are updated for cleanup");
}

@Override
public void onFailure(final Exception e) {
log.error("Failed to update datasource for cleanup after restoring", e);
}
});
}

@Override
public void onFailure(final Exception e) {
log.error("Failed to get datasource after restoring", e);
}
});
}

/**
* Give a delay so that job scheduler can schedule the job right after the delay. Otherwise, it schedules
* the job after specified update interval.
*/
private void scheduleForceUpdate(Datasource datasource) {
IntervalSchedule schedule = new IntervalSchedule(Instant.now(), SCHEDULE_IN_MIN, ChronoUnit.MINUTES, DELAY_IN_MILLIS);
datasource.resetDatabase();
datasource.setSystemSchedule(schedule);
datasource.setTask(DatasourceTask.ALL);
}

@Override
protected void doStart() {
if (DiscoveryNode.isClusterManagerNode(clusterService.getSettings())) {
clusterService.addListener(this);
}
}

@Override
protected void doStop() {
clusterService.removeListener(this);
}

@Override
protected void doClose() throws IOException {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.MapBuilder;
import org.opensearch.common.component.LifecycleComponent;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand Down Expand Up @@ -55,6 +56,7 @@
import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceRunner;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService;
import org.opensearch.geospatial.ip2geo.listener.Ip2GeoListener;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.geospatial.processor.FeatureProcessor;
import org.opensearch.geospatial.rest.action.upload.geojson.RestUploadGeoJSONAction;
Expand Down Expand Up @@ -107,6 +109,11 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
.immutableMap();
}

@Override
public Collection<Class<? extends LifecycleComponent>> getGuiceServiceClasses() {
return List.of(Ip2GeoListener.class);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
Expand Down Expand Up @@ -311,6 +312,27 @@ public void testGetAllDatasources_whenValidInput_thenSucceed() {
assertEquals(datasources, captor.getValue());
}

public void testUpdateDatasource_whenValidInput_thenUpdate() {
List<Datasource> datasources = Arrays.asList(randomDatasource(), randomDatasource());

verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
// Verify
assertTrue(actionRequest instanceof BulkRequest);
BulkRequest bulkRequest = (BulkRequest) actionRequest;
assertEquals(2, bulkRequest.requests().size());
for (int i = 0; i < bulkRequest.requests().size(); i++) {
IndexRequest request = (IndexRequest) bulkRequest.requests().get(i);
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(datasources.get(i).getName(), request.id());
assertEquals(DocWriteRequest.OpType.INDEX, request.opType());
assertTrue(request.source().utf8ToString().contains(datasources.get(i).getEndpoint()));
}
return null;
});

datasourceFacade.updateDatasource(datasources, mock(ActionListener.class));
}

private SearchHits getMockedSearchHits(List<Datasource> datasources) {
SearchHit[] searchHitArray = datasources.stream().map(this::toBytesReference).map(this::toSearchHit).toArray(SearchHit[]::new);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ public void testCurrentIndexName_whenExpired_thenReturnNull() {
assertNull(datasource.currentIndexName());
}

public void testCurrentIndexName_whenDatabaseUpdateDateIsNull_thenReturnNull() {
String id = GeospatialTestHelper.randomLowerCaseString();
Datasource datasource = new Datasource();
datasource.setName(id);
datasource.getDatabase().setProvider("provider");
datasource.getDatabase().setSha256Hash("sha256Hash");
datasource.getDatabase().setUpdatedAt(null);
datasource.getDatabase().setValidForInDays(1l);
datasource.getUpdateStats().setLastSucceededAt(Instant.now());
datasource.getDatabase().setFields(new ArrayList<>());

assertFalse(datasource.isExpired());
assertNull(datasource.currentIndexName());
}

public void testGetIndexNameFor() {
long updatedAt = randomPositiveLong();
DatasourceManifest manifest = mock(DatasourceManifest.class);
Expand All @@ -92,6 +107,19 @@ public void testGetIndexNameFor() {
);
}

public void testResetDatabase_whenCalled_thenNullifySomeFields() {
Datasource datasource = randomDatasource();
assertNotNull(datasource.getDatabase().getSha256Hash());
assertNotNull(datasource.getDatabase().getUpdatedAt());

// Run
datasource.resetDatabase();

// Verify
assertNull(datasource.getDatabase().getSha256Hash());
assertNull(datasource.getDatabase().getUpdatedAt());
}

public void testIsExpired_whenCalled_thenExpectedValue() {
Datasource datasource = new Datasource();
// never expire if validForInDays is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,35 @@ public void testUpdateOrCreateGeoIpData_whenHashValueIsSame_thenSkipUpdate() {
verify(datasourceFacade).updateDatasource(datasource);
}

@SneakyThrows
public void testUpdateOrCreateGeoIpData_whenExpired_thenUpdate() {
File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());
DatasourceManifest manifest = DatasourceManifest.Builder.build(manifestFile.toURI().toURL());

File sampleFile = new File(this.getClass().getClassLoader().getResource("ip2geo/sample_valid.csv").getFile());
when(geoIpDataFacade.getDatabaseReader(any())).thenReturn(CSVParser.parse(sampleFile, StandardCharsets.UTF_8, CSVFormat.RFC4180));

Datasource datasource = new Datasource();
datasource.setState(DatasourceState.AVAILABLE);
datasource.getDatabase().setUpdatedAt(Instant.ofEpochMilli(manifest.getUpdatedAt()));
datasource.getDatabase().setSha256Hash(manifest.getSha256Hash());
datasource.getDatabase().setValidForInDays(1l);
datasource.setEndpoint(manifestFile.toURI().toURL().toExternalForm());
datasource.resetDatabase();

// Run
datasourceUpdateService.updateOrCreateGeoIpData(datasource, mock(Runnable.class));

// Verify
verify(geoIpDataFacade).putGeoIpData(
eq(datasource.currentIndexName()),
isA(String[].class),
any(Iterator.class),
anyInt(),
any(Runnable.class)
);
}

@SneakyThrows
public void testUpdateOrCreateGeoIpData_whenInvalidData_thenThrowException() {
File manifestFile = new File(this.getClass().getClassLoader().getResource("ip2geo/manifest.json").getFile());
Expand Down
Loading

0 comments on commit e7ac455

Please sign in to comment.