Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete GeoIP data indices after restoring complete #334

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.geospatial.exceptions.ResourceInUseException;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
Expand All @@ -36,6 +37,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction<Dele
private final Ip2GeoLockService lockService;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;
private final Ip2GeoProcessorFacade ip2GeoProcessorFacade;

/**
Expand All @@ -53,12 +55,14 @@ public DeleteDatasourceTransportAction(
final Ip2GeoLockService lockService,
final IngestService ingestService,
final DatasourceFacade datasourceFacade,
final GeoIpDataFacade geoIpDataFacade,
final Ip2GeoProcessorFacade ip2GeoProcessorFacade
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
this.geoIpDataFacade = geoIpDataFacade;
this.ip2GeoProcessorFacade = ip2GeoProcessorFacade;
}

Expand Down Expand Up @@ -97,6 +101,7 @@ protected void deleteDatasource(final String datasourceName) throws IOException
}

setDatasourceStateAsDeleting(datasource);
geoIpDataFacade.deleteIp2GeoDataIndex(datasource.getIndices());
datasourceFacade.deleteDatasource(datasource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -192,15 +191,6 @@ public void putDatasource(final Datasource datasource, final ActionListener list
*
*/
public void deleteDatasource(final Datasource datasource) {
if (client.admin()
.indices()
.prepareDelete(datasource.getIndices().toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", datasource.getIndices()));
}
DeleteResponse response = client.prepareDelete()
.setIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
Expand Down Expand Up @@ -387,22 +389,38 @@ public void putGeoIpData(
freezeIndex(indexName);
}

public AcknowledgedResponse deleteIp2GeoDataIndex(final String index) {
if (index == null || index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX) == false) {
public void deleteIp2GeoDataIndex(final String index) {
deleteIp2GeoDataIndex(Arrays.asList(index));
}

public void deleteIp2GeoDataIndex(final List<String> indices) {
if (indices == null || indices.isEmpty()) {
return;
}

Optional<String> invalidIndex = indices.stream()
.filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX) == false)
.findAny();
if (invalidIndex.isPresent()) {
throw new OpenSearchException(
"the index[{}] is not ip2geo data index which should start with {}",
index,
invalidIndex.get(),
IP2GEO_DATA_INDEX_NAME_PREFIX
);
}
return StashedThreadContext.run(

AcknowledgedResponse response = StashedThreadContext.run(
client,
() -> client.admin()
.indices()
.prepareDelete(index)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.prepareDelete(indices.toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
);

if (response.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class Datasource implements Writeable, ScheduledJobParameter {
/**
* Prefix of indices having Ip2Geo data
*/
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data";
public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".geospatial.ip2geo.data";

/**
* Default fields for job scheduling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,8 @@ private List<String> deleteIndices(final List<String> indicesToDelete) {
}

try {
if (geoIpDataFacade.deleteIp2GeoDataIndex(index).isAcknowledged()) {
deletedIndices.add(index);
} else {
log.error("Failed to delete an index [{}]", index);
}
geoIpDataFacade.deleteIp2GeoDataIndex(index);
deletedIndices.add(index);
} catch (Exception e) {
log.error("Failed to delete an index [{}]", index, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@

package org.opensearch.geospatial.ip2geo.listener;

import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX;

import java.io.IOException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand All @@ -23,6 +26,7 @@
import org.opensearch.common.component.AbstractLifecycleComponent;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension;
import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask;
Expand All @@ -37,6 +41,7 @@ public class Ip2GeoListener extends AbstractLifecycleComponent implements Cluste
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final DatasourceFacade datasourceFacade;
private final GeoIpDataFacade geoIpDataFacade;

@Override
public void clusterChanged(final ClusterChangedEvent event) {
Expand All @@ -49,11 +54,17 @@ public void clusterChanged(final ClusterChangedEvent event) {
continue;
}

if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) {
continue;
if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index))) {
threadPool.generic().submit(() -> forceUpdateGeoIpData());
}

threadPool.generic().submit(() -> forceUpdateGeoIpData());
List<String> ip2GeoDataIndices = entry.indices()
.stream()
.filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX))
.collect(Collectors.toList());
if (ip2GeoDataIndices.isEmpty() == false) {
threadPool.generic().submit(() -> geoIpDataFacade.deleteIp2GeoDataIndex(ip2GeoDataIndices));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.HashSet;
import java.util.Locale;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -208,7 +209,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) {
datasource.setSystemSchedule(datasource.getUserSchedule());
datasource.setTask(randomTask());
datasource.setState(randomState());
datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString());
datasource.setCurrentIndex(datasource.newIndexName(UUID.randomUUID().toString()));
datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()));
datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString()));
datasource.getDatabase()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
Expand All @@ -45,6 +47,7 @@ public void init() {
ip2GeoLockService,
ingestService,
datasourceFacade,
geoIpDataFacade,
ip2GeoProcessorFacade
);
}
Expand Down Expand Up @@ -118,7 +121,9 @@ public void testDeleteDatasource_whenSafeToDelete_thenDelete() {
// Verify
assertEquals(DatasourceState.DELETING, datasource.getState());
verify(datasourceFacade).updateDatasource(datasource);
verify(datasourceFacade).deleteDatasource(datasource);
InOrder inOrder = Mockito.inOrder(geoIpDataFacade, datasourceFacade);
inOrder.verify(geoIpDataFacade).deleteIp2GeoDataIndex(datasource.getIndices());
inOrder.verify(datasourceFacade).deleteDatasource(datasource);
}

@SneakyThrows
Expand All @@ -136,6 +141,7 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti
// Verify
assertEquals(DatasourceState.AVAILABLE, datasource.getState());
verify(datasourceFacade, never()).updateDatasource(datasource);
verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices());
verify(datasourceFacade, never()).deleteDatasource(datasource);
}

Expand All @@ -154,6 +160,7 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE

// Verify
verify(datasourceFacade, times(2)).updateDatasource(datasource);
verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices());
verify(datasourceFacade, never()).deleteDatasource(datasource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
Expand All @@ -38,9 +38,7 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.xcontent.json.JsonXContent;
Expand Down Expand Up @@ -218,36 +216,36 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime

public void testDeleteDatasource_whenValidInput_thenSucceed() {
Datasource datasource = randomDatasource();
verifyingClient.setExecuteVerifier(
(actionResponse, actionRequest) -> {
// Verify
if (actionRequest instanceof DeleteIndexRequest) {
DeleteIndexRequest request = (DeleteIndexRequest) actionRequest;
assertEquals(datasource.getIndices().size(), request.indices().length);
assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions());

AcknowledgedResponse response = new AcknowledgedResponse(true);
return response;
} else if (actionRequest instanceof DeleteRequest) {
DeleteRequest request = (DeleteRequest) actionRequest;
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(DocWriteRequest.OpType.DELETE, request.opType());
assertEquals(datasource.getName(), request.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy());

DeleteResponse response = mock(DeleteResponse.class);
when(response.status()).thenReturn(RestStatus.OK);
return response;
} else {
throw new RuntimeException("Not expected request type is passed" + actionRequest.getClass());
}
}
);
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
// Verify
assertTrue(actionRequest instanceof DeleteRequest);
DeleteRequest request = (DeleteRequest) actionRequest;
assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index());
assertEquals(DocWriteRequest.OpType.DELETE, request.opType());
assertEquals(datasource.getName(), request.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy());

DeleteResponse response = mock(DeleteResponse.class);
when(response.status()).thenReturn(RestStatus.OK);
return response;
});

// Run
datasourceFacade.deleteDatasource(datasource);
}

public void testDeleteDatasource_whenIndexNotFound_thenThrowException() {
Datasource datasource = randomDatasource();
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
DeleteResponse response = mock(DeleteResponse.class);
when(response.status()).thenReturn(RestStatus.NOT_FOUND);
return response;
});

// Run
expectThrows(ResourceNotFoundException.class, () -> datasourceFacade.deleteDatasource(datasource));
}

public void testGetDatasources_whenValidInput_thenSucceed() {
List<Datasource> datasources = Arrays.asList(randomDatasource(), randomDatasource());
String[] names = datasources.stream().map(Datasource::getName).toArray(String[]::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.Randomness;
import org.opensearch.common.Strings;
import org.opensearch.common.SuppressForbidden;
Expand Down Expand Up @@ -168,15 +168,14 @@ public void testInternalGetDatabaseReader_whenCalled_thenSetUserAgent() {
verify(connection).addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE);
}

public void testDeleteIp2GeoDataIndex() {
public void testDeleteIp2GeoDataIndex_whenCalled_thenDeleteIndex() {
String index = String.format(Locale.ROOT, "%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, GeospatialTestHelper.randomLowerCaseString());
verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> {
assertTrue(actionRequest instanceof DeleteIndexRequest);
DeleteIndexRequest request = (DeleteIndexRequest) actionRequest;
assertEquals(1, request.indices().length);
assertEquals(index, request.indices()[0]);
assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions());
return null;
return new AcknowledgedResponse(true);
});
verifyingGeoIpDataFacade.deleteIp2GeoDataIndex(index);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.commons.csv.CSVParser;
import org.junit.Before;
import org.opensearch.OpenSearchException;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.SuppressForbidden;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase;
Expand Down Expand Up @@ -199,13 +198,13 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() {
when(metadata.hasIndex(currentIndex)).thenReturn(true);
when(metadata.hasIndex(oldIndex)).thenReturn(true);
when(metadata.hasIndex(lingeringIndex)).thenReturn(false);
when(geoIpDataFacade.deleteIp2GeoDataIndex(any())).thenReturn(new AcknowledgedResponse(true));

datasourceUpdateService.deleteUnusedIndices(datasource);

assertEquals(1, datasource.getIndices().size());
assertEquals(currentIndex, datasource.getIndices().get(0));
verify(datasourceFacade).updateDatasource(datasource);
verify(geoIpDataFacade).deleteIp2GeoDataIndex(oldIndex);
}

public void testUpdateDatasource_whenNoChange_thenNoUpdate() {
Expand Down
Loading