From 64af11dc802ecfbdc7461eb84f633b8b93913acb Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Wed, 7 Jun 2023 17:08:10 -0700 Subject: [PATCH] Delete GeoIP data indices after restoring complete We don't want to use restored GeoIP data indices. Therefore we delete the indices once restoring process complete. When GeoIP metadata index is restored, we create a new GeoIP data index instead. Signed-off-by: Heemin Kim --- .../DeleteDatasourceTransportAction.java | 5 ++ .../ip2geo/common/DatasourceFacade.java | 10 ---- .../ip2geo/common/GeoIpDataFacade.java | 30 ++++++++--- .../ip2geo/jobscheduler/Datasource.java | 2 +- .../jobscheduler/DatasourceUpdateService.java | 7 +-- .../ip2geo/listener/Ip2GeoListener.java | 17 ++++-- .../geospatial/ip2geo/Ip2GeoTestCase.java | 3 +- .../DeleteDatasourceTransportActionTests.java | 9 +++- .../ip2geo/common/DatasourceFacadeTests.java | 54 +++++++++---------- .../ip2geo/common/GeoIpDataFacadeTests.java | 7 ++- .../DatasourceUpdateServiceTests.java | 3 +- .../ip2geo/listener/Ip2GeoListenerTests.java | 28 +++++++++- 12 files changed, 113 insertions(+), 62 deletions(-) diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java index a4c6e5c4..a4706834 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -20,6 +20,7 @@ import org.opensearch.geospatial.exceptions.ResourceInUseException; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoProcessorFacade; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; @@ -36,6 +37,7 @@ public class DeleteDatasourceTransportAction extends HandledTransportAction indices) { + if (indices == null || indices.isEmpty()) { + return; + } + + Optional invalidIndex = indices.stream() + .filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX) == false) + .findAny(); + if (invalidIndex.isPresent()) { throw new OpenSearchException( "the index[{}] is not ip2geo data index which should start with {}", - index, + invalidIndex.get(), IP2GEO_DATA_INDEX_NAME_PREFIX ); } - return StashedThreadContext.run( + + AcknowledgedResponse response = StashedThreadContext.run( client, () -> client.admin() .indices() - .prepareDelete(index) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .prepareDelete(indices.toArray(new String[0])) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) .execute() .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) ); + + if (response.isAcknowledged() == false) { + throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", indices)); + } } } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java index dd63d746..b3d6b328 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/Datasource.java @@ -50,7 +50,7 @@ public class Datasource implements Writeable, ScheduledJobParameter { /** * Prefix of indices having Ip2Geo data */ - public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".ip2geo-data"; + public static final String IP2GEO_DATA_INDEX_NAME_PREFIX = ".geospatial.ip2geo.data"; /** * Default fields for job scheduling diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java index 579f5bbe..d4e55f79 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateService.java @@ -164,11 +164,8 @@ private List deleteIndices(final List indicesToDelete) { } try { - if (geoIpDataFacade.deleteIp2GeoDataIndex(index).isAcknowledged()) { - deletedIndices.add(index); - } else { - log.error("Failed to delete an index [{}]", index); - } + geoIpDataFacade.deleteIp2GeoDataIndex(index); + deletedIndices.add(index); } catch (Exception e) { log.error("Failed to delete an index [{}]", index, e); } diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java b/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java index e58ab4b9..a06bb30c 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListener.java @@ -5,10 +5,13 @@ package org.opensearch.geospatial.ip2geo.listener; +import static org.opensearch.geospatial.ip2geo.jobscheduler.Datasource.IP2GEO_DATA_INDEX_NAME_PREFIX; + import java.io.IOException; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.List; +import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -23,6 +26,7 @@ import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.inject.Inject; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceExtension; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceTask; @@ -37,6 +41,7 @@ public class Ip2GeoListener extends AbstractLifecycleComponent implements Cluste private final ClusterService clusterService; private final ThreadPool threadPool; private final DatasourceFacade datasourceFacade; + private final GeoIpDataFacade geoIpDataFacade; @Override public void clusterChanged(final ClusterChangedEvent event) { @@ -49,11 +54,17 @@ public void clusterChanged(final ClusterChangedEvent event) { continue; } - if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index)) == false) { - continue; + if (entry.indices().stream().anyMatch(index -> DatasourceExtension.JOB_INDEX_NAME.equals(index))) { + threadPool.generic().submit(() -> forceUpdateGeoIpData()); } - threadPool.generic().submit(() -> forceUpdateGeoIpData()); + List ip2GeoDataIndices = entry.indices() + .stream() + .filter(index -> index.startsWith(IP2GEO_DATA_INDEX_NAME_PREFIX)) + .collect(Collectors.toList()); + if (ip2GeoDataIndices.isEmpty() == false) { + threadPool.generic().submit(() -> geoIpDataFacade.deleteIp2GeoDataIndex(ip2GeoDataIndices)); + } } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index f8b40232..01f23e63 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -17,6 +17,7 @@ import java.util.HashSet; import java.util.Locale; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -208,7 +209,7 @@ protected Datasource randomDatasource(final Instant updateStartTime) { datasource.setSystemSchedule(datasource.getUserSchedule()); datasource.setTask(randomTask()); datasource.setState(randomState()); - datasource.setCurrentIndex(GeospatialTestHelper.randomLowerCaseString()); + datasource.setCurrentIndex(datasource.newIndexName(UUID.randomUUID().toString())); datasource.setIndices(Arrays.asList(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString())); datasource.setEndpoint(String.format(Locale.ROOT, "https://%s.com/manifest.json", GeospatialTestHelper.randomLowerCaseString())); datasource.getDatabase() diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java index 71447454..fa2afef8 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -23,6 +23,8 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mockito; import org.opensearch.OpenSearchException; import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; @@ -45,6 +47,7 @@ public void init() { ip2GeoLockService, ingestService, datasourceFacade, + geoIpDataFacade, ip2GeoProcessorFacade ); } @@ -118,7 +121,9 @@ public void testDeleteDatasource_whenSafeToDelete_thenDelete() { // Verify assertEquals(DatasourceState.DELETING, datasource.getState()); verify(datasourceFacade).updateDatasource(datasource); - verify(datasourceFacade).deleteDatasource(datasource); + InOrder inOrder = Mockito.inOrder(geoIpDataFacade, datasourceFacade); + inOrder.verify(geoIpDataFacade).deleteIp2GeoDataIndex(datasource.getIndices()); + inOrder.verify(datasourceFacade).deleteDatasource(datasource); } @SneakyThrows @@ -136,6 +141,7 @@ public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowExcepti // Verify assertEquals(DatasourceState.AVAILABLE, datasource.getState()); verify(datasourceFacade, never()).updateDatasource(datasource); + verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices()); verify(datasourceFacade, never()).deleteDatasource(datasource); } @@ -154,6 +160,7 @@ public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowE // Verify verify(datasourceFacade, times(2)).updateDatasource(datasource); + verify(geoIpDataFacade, never()).deleteIp2GeoDataIndex(datasource.getIndices()); verify(datasourceFacade, never()).deleteDatasource(datasource); } } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index 2a29afb1..43d8727e 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -22,11 +22,11 @@ import org.junit.Before; import org.mockito.ArgumentCaptor; import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.bulk.BulkRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.delete.DeleteResponse; @@ -38,9 +38,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.Randomness; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.json.JsonXContent; @@ -218,36 +216,36 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime public void testDeleteDatasource_whenValidInput_thenSucceed() { Datasource datasource = randomDatasource(); - verifyingClient.setExecuteVerifier( - (actionResponse, actionRequest) -> { - // Verify - if (actionRequest instanceof DeleteIndexRequest) { - DeleteIndexRequest request = (DeleteIndexRequest) actionRequest; - assertEquals(datasource.getIndices().size(), request.indices().length); - assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions()); - - AcknowledgedResponse response = new AcknowledgedResponse(true); - return response; - } else if (actionRequest instanceof DeleteRequest) { - DeleteRequest request = (DeleteRequest) actionRequest; - assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); - assertEquals(DocWriteRequest.OpType.DELETE, request.opType()); - assertEquals(datasource.getName(), request.id()); - assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy()); - - DeleteResponse response = mock(DeleteResponse.class); - when(response.status()).thenReturn(RestStatus.OK); - return response; - } else { - throw new RuntimeException("Not expected request type is passed" + actionRequest.getClass()); - } - } - ); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + // Verify + assertTrue(actionRequest instanceof DeleteRequest); + DeleteRequest request = (DeleteRequest) actionRequest; + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(DocWriteRequest.OpType.DELETE, request.opType()); + assertEquals(datasource.getName(), request.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy()); + + DeleteResponse response = mock(DeleteResponse.class); + when(response.status()).thenReturn(RestStatus.OK); + return response; + }); // Run datasourceFacade.deleteDatasource(datasource); } + public void testDeleteDatasource_whenIndexNotFound_thenThrowException() { + Datasource datasource = randomDatasource(); + verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { + DeleteResponse response = mock(DeleteResponse.class); + when(response.status()).thenReturn(RestStatus.NOT_FOUND); + return response; + }); + + // Run + expectThrows(ResourceNotFoundException.class, () -> datasourceFacade.deleteDatasource(datasource)); + } + public void testGetDatasources_whenValidInput_thenSucceed() { List datasources = Arrays.asList(randomDatasource(), randomDatasource()); String[] names = datasources.stream().map(Datasource::getName).toArray(String[]::new); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java index 74d5f189..6ddac0a5 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/GeoIpDataFacadeTests.java @@ -51,7 +51,7 @@ import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; -import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.Randomness; import org.opensearch.common.Strings; import org.opensearch.common.SuppressForbidden; @@ -168,15 +168,14 @@ public void testInternalGetDatabaseReader_whenCalled_thenSetUserAgent() { verify(connection).addRequestProperty(Constants.USER_AGENT_KEY, Constants.USER_AGENT_VALUE); } - public void testDeleteIp2GeoDataIndex() { + public void testDeleteIp2GeoDataIndex_whenCalled_thenDeleteIndex() { String index = String.format(Locale.ROOT, "%s.%s", IP2GEO_DATA_INDEX_NAME_PREFIX, GeospatialTestHelper.randomLowerCaseString()); verifyingClient.setExecuteVerifier((actionResponse, actionRequest) -> { assertTrue(actionRequest instanceof DeleteIndexRequest); DeleteIndexRequest request = (DeleteIndexRequest) actionRequest; assertEquals(1, request.indices().length); assertEquals(index, request.indices()[0]); - assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions()); - return null; + return new AcknowledgedResponse(true); }); verifyingGeoIpDataFacade.deleteIp2GeoDataIndex(index); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java index f650fc98..b110ddea 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/jobscheduler/DatasourceUpdateServiceTests.java @@ -28,7 +28,6 @@ import org.apache.commons.csv.CSVParser; import org.junit.Before; import org.opensearch.OpenSearchException; -import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.SuppressForbidden; import org.opensearch.geospatial.GeospatialTestHelper; import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; @@ -199,13 +198,13 @@ public void testDeleteUnusedIndices_whenValidInput_thenSucceed() { when(metadata.hasIndex(currentIndex)).thenReturn(true); when(metadata.hasIndex(oldIndex)).thenReturn(true); when(metadata.hasIndex(lingeringIndex)).thenReturn(false); - when(geoIpDataFacade.deleteIp2GeoDataIndex(any())).thenReturn(new AcknowledgedResponse(true)); datasourceUpdateService.deleteUnusedIndices(datasource); assertEquals(1, datasource.getIndices().size()); assertEquals(currentIndex, datasource.getIndices().get(0)); verify(datasourceFacade).updateDatasource(datasource); + verify(geoIpDataFacade).deleteIp2GeoDataIndex(oldIndex); } public void testUpdateDatasource_whenNoChange_thenNoUpdate() { diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java index ff2cd3e3..a72a5a1b 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/listener/Ip2GeoListenerTests.java @@ -36,7 +36,7 @@ public class Ip2GeoListenerTests extends Ip2GeoTestCase { @Before public void init() { - ip2GeoListener = new Ip2GeoListener(clusterService, threadPool, datasourceFacade); + ip2GeoListener = new Ip2GeoListener(clusterService, threadPool, datasourceFacade, geoIpDataFacade); } public void testDoStart_whenClusterManagerNode_thenAddListener() { @@ -170,4 +170,30 @@ public void testClusterChanged_whenDatasourceIndexIsRestored_thenUpdate() { verify(datasourceFacade).updateDatasource(eq(datasources), any()); } + public void testClusterChanged_whenGeoIpDataIsRestored_thenDelete() { + Datasource datasource = randomDatasource(); + SnapshotId snapshotId = new SnapshotId(GeospatialTestHelper.randomLowerCaseString(), GeospatialTestHelper.randomLowerCaseString()); + Snapshot snapshot = new Snapshot(GeospatialTestHelper.randomLowerCaseString(), snapshotId); + RestoreInProgress.Entry entry = new RestoreInProgress.Entry( + GeospatialTestHelper.randomLowerCaseString(), + snapshot, + RestoreInProgress.State.SUCCESS, + Arrays.asList(datasource.currentIndexName()), + null + ); + RestoreInProgress restoreInProgress = new RestoreInProgress.Builder().add(entry).build(); + ClusterState clusterState = mock(ClusterState.class); + when(clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).thenReturn(restoreInProgress); + ClusterChangedEvent event = mock(ClusterChangedEvent.class); + when(event.localNodeClusterManager()).thenReturn(true); + when(event.state()).thenReturn(clusterState); + + // Run + ip2GeoListener.clusterChanged(event); + + // Verify + verify(threadPool).generic(); + verify(geoIpDataFacade).deleteIp2GeoDataIndex(Arrays.asList(datasource.currentIndexName())); + } + }