Skip to content

Commit

Permalink
Delete GeoIP data indices after restoring complete
Browse files Browse the repository at this point in the history
We don't want to use restored GeoIP data indices. Therefore we
delete the indices once restoring process complete.

When GeoIP metadata index is restored, we create a new GeoIP data index instead.

Signed-off-by: Heemin Kim <[email protected]>
  • Loading branch information
heemin32 committed Jun 8, 2023
1 parent d5bea75 commit 64af11d
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
Expand All @@ -36,6 +37,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
private final Ip2GeoLockService lockService;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
private final Ip2GeoProcessorFacade ip2GeoProcessorFacade;

/**
Expand All @@ -53,12 +55,14 @@ public DeleteDatasourceTransportAction(
final Ip2GeoLockService lockService,
final IngestService ingestService,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade,
final Ip2GeoProcessorFacade ip2GeoProcessorFacade
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
this.ip2GeoProcessorFacade = ip2GeoProcessorFacade;
}

Expand Down Expand Up @@ -97,6 +101,7 @@ protected void deleteDatasource(final String datasourceName) throws IOException
}

setDatasourceStateAsDeleting(datasource);
geoIpDataFacade.deleteIp2GeoDataIndex(datasource.getIndices());
datasourceFacade.deleteDatasource(datasource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -192,15 +191,6 @@ public void putDatasource(final Datasource datasource, final ActionListener list
*
*/
public void deleteDatasource(final Datasource datasource) {
if (client.admin()
.indices()
.prepareDelete(datasource.getIndices().toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", datasource.getIndices()));
}
DeleteResponse response = client.prepareDelete()
.setIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
Expand Down Expand Up @@ -387,22 +389,38 @@ public void putGeoIpData(
freezeIndex(indexName);
}

public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
if (index == null || index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX) == false) {
public void deleteIp2GeoDataIndex(final String index) {
deleteIp2GeoDataIndex(Arrays.asList(index));
}

public void deleteIp2GeoDataIndex(final List<String> indices) {
if (indices == null || indices.isEmpty()) {
return;
}

Optional<String> invalidIndex = indices.stream()
.filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX) == false)
.findAny();
if (invalidIndex.isPresent()) {
throw new OpenSearchException(
"the index[{}] is not ip2geo data index which should start with {}",
index,
invalidIndex.get(),
IP2GEO_DATA_INDEX_NAME_PREFIX
);
}
return StashedThreadContext.run(

AcknowledgedResponse response = StashedThreadContext.run(
client,
() -> client.admin()
.indices()
.prepareDelete(index)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.prepareDelete(indices.toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);

if (response.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
/**
* Prefix of indices having Ip2Geo data
*/
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data";
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".geospatial.ip2geo.data";

/**
* Default fields for job scheduling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,8 @@ private List<String> deleteIndices(final List<String> indicesToDelete) {
}

try {
if (geoIpDataFacade.deleteIp2GeoDataIndex(index).isAcknowledged()) {
deletedIndices.add(index);
} else {
log.error("Failed to delete an index [{}]", index);
}
geoIpDataFacade.deleteIp2GeoDataIndex(index);
deletedIndices.add(index);
} catch (Exception e) {
log.error("Failed to delete an index [{}]", index, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.geospatial.ip2geo.listener;

import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand All @@ -23,6 +26,7 @@
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
Expand All @@ -37,6 +41,7 @@ public class Ip2GeoListener extends AbstractLifecycleComponent implements Cluste
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;

@Override
public void clusterChanged(final ClusterChangedEvent event) {
Expand All @@ -49,11 +54,17 @@ public void clusterChanged(final ClusterChangedEvent event) {
continue;
}

if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) {
continue;
if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index))) {
threadPool.generic().submit(() -> forceUpdateGeoIpData());
}

threadPool.generic().submit(() -> forceUpdateGeoIpData());
List<String> ip2GeoDataIndices = entry.indices()
.stream()
.filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX))
.collect(Collectors.toList());
if (ip2GeoDataIndices.isEmpty() == false) {
threadPool.generic().submit(() -> geoIpDataFacade.deleteIp2GeoDataIndex(ip2GeoDataIndices));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -208,7 +209,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) {
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString());
datasource.setCurrentIndex(datasource.newIndexName(UUID.randomUUID().toString()));
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
Expand All @@ -45,6 +47,7 @@ public void init() {
ip2GeoLockService,
ingestService,
datasourceFacade,
geoIpDataFacade,
ip2GeoProcessorFacade
);
}
Expand Down Expand Up @@ -118,7 +121,9 @@ public void testDeleteDatasource_whenSafeToDelete_thenDelete() {
// Verify
assertEquals(DatasourceState.DELETING, datasource.getState());
verify(datasourceFacade).updateDatasource(datasource);
verify(datasourceFacade).deleteDatasource(datasource);
InOrder inOrder = Mockito.inOrder(geoIpDataFacade, datasourceFacade);
inOrder.verify(geoIpDataFacade).deleteIp2GeoDataIndex(datasource.getIndices());
inOrder.verify(datasourceFacade).deleteDatasource(datasource);
}

@SneakyThrows
Expand All @@ -136,6 +141,7 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti
// Verify
assertEquals(DatasourceState.AVAILABLE, datasource.getState());
verify(datasourceFacade, never()).updateDatasource(datasource);
verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices());
verify(datasourceFacade, never()).deleteDatasource(datasource);
}

Expand All @@ -154,6 +160,7 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE

// Verify
verify(datasourceFacade, times(2)).updateDatasource(datasource);
verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices());
verify(datasourceFacade, never()).deleteDatasource(datasource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
Expand All @@ -38,9 +38,7 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -218,36 +216,36 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime

public void testDeleteDatasource_whenValidInput_thenSucceed() {
Datasource datasource = randomDatasource();
verifyingClient.setExecuteVerifier(
(actionResponse, actionRequest) -> {
// Verify
if (actionRequest instanceof DeleteIndexRequest) {
DeleteIndexRequest request = (DeleteIndexRequest) actionRequest;
assertEquals(datasource.getIndices().size(), request.indices().length);
assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions());

AcknowledgedResponse response = new AcknowledgedResponse(true);
return response;
} else if (actionRequest instanceof DeleteRequest) {
DeleteRequest request = (DeleteRequest) actionRequest;
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(DocWriteRequest.OpType.DELETE, request.opType());
assertEquals(datasource.getName(), request.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy());

DeleteResponse response = mock(DeleteResponse.class);
when(response.status()).thenReturn(RestStatus.OK);
return response;
} else {
throw new RuntimeException("Not expected request type is passed" + actionRequest.getClass());
}
}
);
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
// Verify
assertTrue(actionRequest instanceof DeleteRequest);
DeleteRequest request = (DeleteRequest) actionRequest;
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(DocWriteRequest.OpType.DELETE, request.opType());
assertEquals(datasource.getName(), request.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy());

DeleteResponse response = mock(DeleteResponse.class);
when(response.status()).thenReturn(RestStatus.OK);
return response;
});

// Run
datasourceFacade.deleteDatasource(datasource);
}

public void testDeleteDatasource_whenIndexNotFound_thenThrowException() {
Datasource datasource = randomDatasource();
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
DeleteResponse response = mock(DeleteResponse.class);
when(response.status()).thenReturn(RestStatus.NOT_FOUND);
return response;
});

// Run
expectThrows(ResourceNotFoundException.class, () -> datasourceFacade.deleteDatasource(datasource));
}

public void testGetDatasources_whenValidInput_thenSucceed() {
List<Datasource> datasources = Arrays.asList(randomDatasource(), randomDatasource());
String[] names = datasources.stream().map(Datasource::getName).toArray(String[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.Strings;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -168,15 +168,14 @@ public void testInternalGetDatabaseReader_whenCalled_thenSetUserAgent() {
verify(connection).addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE);
}

public void testDeleteIp2GeoDataIndex() {
public void testDeleteIp2GeoDataIndex_whenCalled_thenDeleteIndex() {
String index = String.format(Locale.ROOT, "%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, GeospatialTestHelper.randomLowerCaseString());
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assertTrue(actionRequest instanceof DeleteIndexRequest);
DeleteIndexRequest request = (DeleteIndexRequest) actionRequest;
assertEquals(1, request.indices().length);
assertEquals(index, request.indices()[0]);
assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions());
return null;
return new AcknowledgedResponse(true);
});
verifyingGeoIpDataFacade.deleteIp2GeoDataIndex(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.commons.csv.CSVParser;
import org.junit.Before;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
Expand Down Expand Up @@ -199,13 +198,13 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() {
when(metadata.hasIndex(currentIndex)).thenReturn(true);
when(metadata.hasIndex(oldIndex)).thenReturn(true);
when(metadata.hasIndex(lingeringIndex)).thenReturn(false);
when(geoIpDataFacade.deleteIp2GeoDataIndex(any())).thenReturn(new AcknowledgedResponse(true));

datasourceUpdateService.deleteUnusedIndices(datasource);

assertEquals(1, datasource.getIndices().size());
assertEquals(currentIndex, datasource.getIndices().get(0));
verify(datasourceFacade).updateDatasource(datasource);
verify(geoIpDataFacade).deleteIp2GeoDataIndex(oldIndex);
}

public void testUpdateDatasource_whenNoChange_thenNoUpdate() {
Expand Down
Loading

0 comments on commit 64af11d

Please sign in to comment.