From 999046abada0951d137314ccc688658e0dc2a6cc Mon Sep 17 00:00:00 2001 From: Heemin Kim Date: Fri, 5 May 2023 13:18:23 -0700 Subject: [PATCH] Implements delete datasource API Signed-off-by: Heemin Kim --- .../ip2geo/action/DeleteDatasourceAction.java | 27 +++ .../action/DeleteDatasourceRequest.java | 65 ++++++ .../DeleteDatasourceTransportAction.java | 134 ++++++++++++ .../ip2geo/action/GetDatasourceAction.java | 2 +- .../action/RestDeleteDatasourceHandler.java | 49 +++++ .../ip2geo/common/DatasourceFacade.java | 38 +++- .../ip2geo/processor/Ip2GeoProcessor.java | 5 + .../geospatial/plugin/GeospatialPlugin.java | 9 +- .../geospatial/ip2geo/Ip2GeoTestCase.java | 9 + .../action/DeleteDatasourceRequestTests.java | 30 +++ .../DeleteDatasourceTransportActionTests.java | 202 ++++++++++++++++++ .../RestDeleteDatasourceHandlerTests.java | 49 +++++ .../ip2geo/common/DatasourceFacadeTests.java | 38 ++++ .../plugin/GeospatialPluginTests.java | 4 +- 14 files changed, 655 insertions(+), 6 deletions(-) create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequest.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java create mode 100644 src/main/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandler.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java create mode 100644 src/test/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandlerTests.java diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceAction.java new file mode 100644 index 00000000..b08e0861 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceAction.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import org.opensearch.action.ActionType; +import org.opensearch.action.support.master.AcknowledgedResponse; + +/** + * Ip2Geo datasource delete action + */ +public class DeleteDatasourceAction extends ActionType { + /** + * Delete datasource action instance + */ + public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction(); + /** + * Delete datasource action name + */ + public static final String NAME = "cluster:admin/geospatial/datasource/delete"; + + private DeleteDatasourceAction() { + super(NAME, AcknowledgedResponse::new); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequest.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequest.java new file mode 100644 index 00000000..07d393fe --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequest.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; + +import lombok.Getter; +import lombok.Setter; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +/** + * GeoIP datasource delete request + */ +@Getter +@Setter +public class DeleteDatasourceRequest extends ActionRequest { + /** + * @param name the datasource name + * @return the datasource name + */ + private String name; + + /** + * Constructor + * + * @param name list of datasource name + */ + public DeleteDatasourceRequest(final String name) { + this.name = name; + } + + /** + * Constructor + * + * @param in the stream input + * @throws IOException IOException + */ + public DeleteDatasourceRequest(final StreamInput in) throws IOException { + super(in); + this.name = in.readString(); + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException errors = null; + if (name == null || name.isBlank()) { + errors = new ActionRequestValidationException(); + errors.addValidationError("Datasource name should not be empty"); + } + return errors; + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java new file mode 100644 index 00000000..2acdfd52 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportAction.java @@ -0,0 +1,134 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import java.io.IOException; + +import lombok.extern.log4j.Log4j2; + +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.inject.Inject; +import org.opensearch.geospatial.annotation.VisibleForTesting; +import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; +import org.opensearch.ingest.IngestMetadata; +import org.opensearch.ingest.IngestService; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; + +/** + * Transport action to delete datasource + */ +@Log4j2 +public class DeleteDatasourceTransportAction extends HandledTransportAction { + private static final long LOCK_DURATION_IN_SECONDS = 300l; + private final Ip2GeoLockService lockService; + private final IngestService ingestService; + private final DatasourceFacade datasourceFacade; + + /** + * Constructor + * @param transportService the transport service + * @param actionFilters the action filters + * @param lockService the lock service + * @param ingestService the ingest service + * @param datasourceFacade the datasource facade + */ + @Inject + public DeleteDatasourceTransportAction( + final TransportService transportService, + final ActionFilters actionFilters, + final Ip2GeoLockService lockService, + final IngestService ingestService, + final DatasourceFacade datasourceFacade + ) { + super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new); + this.lockService = lockService; + this.ingestService = ingestService; + this.datasourceFacade = datasourceFacade; + } + + /** + * We delete datasource regardless of its state as long as we can acquire a lock + * + * @param task the task + * @param request the request + * @param listener the listener + */ + @Override + protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener listener) { + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later")); + return; + } + try { + deleteDatasource(request.getName()); + listener.onResponse(new AcknowledgedResponse(true)); + } catch (Exception e) { + listener.onFailure(e); + } finally { + lockService.releaseLock( + lock, + ActionListener.wrap( + released -> { log.info("Released lock for datasource[{}]", request.getName()); }, + exception -> { log.error("Failed to release the lock", exception); } + ) + ); + } + }, exception -> { listener.onFailure(exception); })); + } + + @VisibleForTesting + protected void deleteDatasource(final String datasourceName) throws IOException { + Datasource datasource = datasourceFacade.getDatasource(datasourceName); + if (datasource == null) { + throw new ResourceNotFoundException("no such datasource exist"); + } + + setDatasourceStateAsDeleting(datasource); + datasourceFacade.deleteDatasource(datasource); + } + + private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException { + if (isSafeToDelete(datasource) == false) { + throw new OpenSearchException("datasource is being used by one of processors"); + } + + DatasourceState previousState = datasource.getState(); + datasource.setState(DatasourceState.DELETING); + datasourceFacade.updateDatasource(datasource); + + // Check again as processor might just have been created. + // If it fails to update the state back to the previous state, the new processor + // will fail to convert an ip to a geo data. + // In such case, user have to delete the processor and delete this datasource again. + if (isSafeToDelete(datasource) == false) { + datasource.setState(previousState); + datasourceFacade.updateDatasource(datasource); + throw new OpenSearchException("datasource is being used by one of processors"); + } + } + + private boolean isSafeToDelete(Datasource datasource) { + IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE); + return ingestMetadata.getPipelines() + .keySet() + .stream() + .flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream()) + .filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName())) + .findAny() + .isEmpty(); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java index 7771fd30..039ab35b 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/GetDatasourceAction.java @@ -16,7 +16,7 @@ public class GetDatasourceAction extends ActionType { */ public static final GetDatasourceAction INSTANCE = new GetDatasourceAction(); /** - * Name of a get datasource action + * Get datasource action name */ public static final String NAME = "cluster:admin/geospatial/datasource/get"; diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandler.java b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandler.java new file mode 100644 index 00000000..dc2dd117 --- /dev/null +++ b/src/main/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandler.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; +import static org.opensearch.rest.RestRequest.Method.DELETE; + +import java.util.List; +import java.util.Locale; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +/** + * Rest handler for Ip2Geo datasource delete request + */ +public class RestDeleteDatasourceHandler extends BaseRestHandler { + private static final String ACTION_NAME = "ip2geo_datasource_delete"; + private static final String PARAMS_NAME = "name"; + + @Override + public String getName() { + return ACTION_NAME; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) { + final String name = request.param(PARAMS_NAME); + final DeleteDatasourceRequest deleteDatasourceRequest = new DeleteDatasourceRequest(name); + + return channel -> client.executeLocally( + DeleteDatasourceAction.INSTANCE, + deleteDatasourceRequest, + new RestToXContentListener<>(channel) + ); + } + + @Override + public List routes() { + String path = String.join(URL_DELIMITER, getPluginURLPrefix(), String.format(Locale.ROOT, "ip2geo/datasource/{%s}", PARAMS_NAME)); + return List.of(new Route(DELETE, path)); + } +} diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java index 6e03a937..a14a9c72 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacade.java @@ -18,23 +18,24 @@ import java.util.Objects; import java.util.stream.Collectors; -import javax.swing.*; - import lombok.extern.log4j.Log4j2; import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.get.MultiGetItemResponse; import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -52,6 +53,7 @@ import org.opensearch.geospatial.shared.StashedThreadContext; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.QueryBuilders; +import org.opensearch.rest.RestStatus; import org.opensearch.search.SearchHit; /** @@ -164,6 +166,38 @@ public void putDatasource(final Datasource datasource, final ActionListener list }); } + /** + * Delete datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME} + * + * @param datasource the datasource + * + */ + public void deleteDatasource(final Datasource datasource) { + if (client.admin() + .indices() + .prepareDelete(datasource.getIndices().toArray(new String[0])) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .execute() + .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)) + .isAcknowledged() == false) { + throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", datasource.getIndices())); + } + DeleteResponse response = client.prepareDelete() + .setIndex(DatasourceExtension.JOB_INDEX_NAME) + .setId(datasource.getName()) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .execute() + .actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT)); + + if (response.status().equals(RestStatus.OK)) { + log.info("deleted datasource[{}] successfully", datasource.getName()); + } else if (response.status().equals(RestStatus.NOT_FOUND)) { + throw new ResourceNotFoundException("datasource[{}] does not exist", datasource.getName()); + } else { + throw new OpenSearchException("failed to delete datasource[{}] with status[{}]", datasource.getName(), response.status()); + } + } + /** * Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME} * @param name the name of a datasource diff --git a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java index 99a29865..acd56150 100644 --- a/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java +++ b/src/main/java/org/opensearch/geospatial/ip2geo/processor/Ip2GeoProcessor.java @@ -20,6 +20,7 @@ import java.util.function.BiConsumer; import java.util.stream.Collectors; +import lombok.Getter; import lombok.extern.log4j.Log4j2; import org.opensearch.action.ActionListener; @@ -44,6 +45,10 @@ public final class Ip2GeoProcessor extends AbstractProcessor { private static final String PROPERTY_IP = "ip"; private final String field; private final String targetField; + /** + * @return The datasource name + */ + @Getter private final String datasourceName; private final Set properties; private final boolean ignoreMissing; diff --git a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java index ff54aed6..5bca7b3d 100644 --- a/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java +++ b/src/main/java/org/opensearch/geospatial/plugin/GeospatialPlugin.java @@ -36,10 +36,13 @@ import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldMapper; import org.opensearch.geospatial.index.mapper.xyshape.XYShapeFieldTypeParser; import org.opensearch.geospatial.index.query.xyshape.XYShapeQueryBuilder; +import org.opensearch.geospatial.ip2geo.action.DeleteDatasourceAction; +import org.opensearch.geospatial.ip2geo.action.DeleteDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.action.GetDatasourceAction; import org.opensearch.geospatial.ip2geo.action.GetDatasourceTransportAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceAction; import org.opensearch.geospatial.ip2geo.action.PutDatasourceTransportAction; +import org.opensearch.geospatial.ip2geo.action.RestDeleteDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -153,7 +156,8 @@ public List getRestHandlers( new RestUploadStatsAction(), new RestUploadGeoJSONAction(), new RestPutDatasourceHandler(clusterSettings), - new RestGetDatasourceHandler() + new RestGetDatasourceHandler(), + new RestDeleteDatasourceHandler() ); } @@ -163,7 +167,8 @@ public List getRestHandlers( new ActionHandler<>(UploadGeoJSONAction.INSTANCE, UploadGeoJSONTransportAction.class), new ActionHandler<>(UploadStatsAction.INSTANCE, UploadStatsTransportAction.class), new ActionHandler<>(PutDatasourceAction.INSTANCE, PutDatasourceTransportAction.class), - new ActionHandler<>(GetDatasourceAction.INSTANCE, GetDatasourceTransportAction.class) + new ActionHandler<>(GetDatasourceAction.INSTANCE, GetDatasourceTransportAction.class), + new ActionHandler<>(DeleteDatasourceAction.INSTANCE, DeleteDatasourceTransportAction.class) ); } diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java index ee57b5ba..a278ebf6 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/Ip2GeoTestCase.java @@ -13,6 +13,7 @@ import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Locale; import java.util.concurrent.ExecutorService; @@ -43,9 +44,11 @@ import org.opensearch.geospatial.ip2geo.common.DatasourceState; import org.opensearch.geospatial.ip2geo.common.GeoIpDataFacade; import org.opensearch.geospatial.ip2geo.common.Ip2GeoExecutor; +import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService; import org.opensearch.geospatial.ip2geo.common.Ip2GeoSettings; import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; import org.opensearch.geospatial.ip2geo.jobscheduler.DatasourceUpdateService; +import org.opensearch.ingest.IngestMetadata; import org.opensearch.ingest.IngestService; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; import org.opensearch.jobscheduler.spi.utils.LockService; @@ -81,6 +84,9 @@ public abstract class Ip2GeoTestCase extends RestActionTestCase { protected ThreadPool threadPool; @Mock protected TransportService transportService; + @Mock + protected Ip2GeoLockService ip2GeoLockService; + protected IngestMetadata ingestMetadata; protected NoOpNodeClient client; protected VerifyingClient verifyingClient; protected LockService lockService; @@ -96,10 +102,13 @@ public void prepareIp2GeoTestCase() { verifyingClient = spy(new VerifyingClient(this.getTestName())); clusterSettings = new ClusterSettings(settings, new HashSet<>(Ip2GeoSettings.settings())); lockService = new LockService(client, clusterService); + ingestMetadata = new IngestMetadata(Collections.emptyMap()); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); when(clusterService.getSettings()).thenReturn(Settings.EMPTY); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); when(clusterService.state()).thenReturn(clusterState); when(clusterState.metadata()).thenReturn(metadata); + when(clusterState.getMetadata()).thenReturn(metadata); when(clusterState.routingTable()).thenReturn(RoutingTable.EMPTY_ROUTING_TABLE); when(ip2GeoExecutor.forDatasourceUpdate()).thenReturn(executorService); when(ingestService.getClusterService()).thenReturn(clusterService); diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java new file mode 100644 index 00000000..8bd84924 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceRequestTests.java @@ -0,0 +1,30 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import lombok.SneakyThrows; + +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; + +public class DeleteDatasourceRequestTests extends Ip2GeoTestCase { + @SneakyThrows + public void testStreamInOut_whenValidInput_thenSucceed() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasourceName); + + // Run + BytesStreamOutput output = new BytesStreamOutput(); + request.writeTo(output); + BytesStreamInput input = new BytesStreamInput(output.bytes().toBytesRef().bytes); + DeleteDatasourceRequest copiedRequest = new DeleteDatasourceRequest(input); + + // Verify + assertEquals(request.getName(), copiedRequest.getName()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java new file mode 100644 index 00000000..9228d1cd --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/DeleteDatasourceTransportActionTests.java @@ -0,0 +1,202 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import lombok.SneakyThrows; + +import org.junit.Before; +import org.mockito.ArgumentCaptor; +import org.opensearch.OpenSearchException; +import org.opensearch.ResourceNotFoundException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.geospatial.ip2geo.Ip2GeoTestCase; +import org.opensearch.geospatial.ip2geo.common.DatasourceState; +import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource; +import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor; +import org.opensearch.ingest.IngestMetadata; +import org.opensearch.ingest.PipelineConfiguration; +import org.opensearch.jobscheduler.spi.LockModel; +import org.opensearch.tasks.Task; + +public class DeleteDatasourceTransportActionTests extends Ip2GeoTestCase { + private DeleteDatasourceTransportAction action; + + @Before + public void init() { + action = new DeleteDatasourceTransportAction(transportService, actionFilters, ip2GeoLockService, ingestService, datasourceFacade); + } + + @SneakyThrows + public void testDoExecute_whenFailedToAcquireLock_thenError() { + validateDoExecute(null, null); + } + + @SneakyThrows + public void testDoExecute_whenValidInput_thenSucceed() { + String jobIndexName = GeospatialTestHelper.randomLowerCaseString(); + String jobId = GeospatialTestHelper.randomLowerCaseString(); + LockModel lockModel = new LockModel(jobIndexName, jobId, Instant.now(), randomPositiveLong(), false); + validateDoExecute(lockModel, null); + } + + @SneakyThrows + public void testDoExecute_whenException_thenError() { + validateDoExecute(null, new RuntimeException()); + } + + private void validateDoExecute(final LockModel lockModel, final Exception exception) throws IOException { + Task task = mock(Task.class); + Datasource datasource = randomDatasource(); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + DeleteDatasourceRequest request = new DeleteDatasourceRequest(datasource.getName()); + ActionListener listener = mock(ActionListener.class); + + // Run + action.doExecute(task, request, listener); + + // Verify + ArgumentCaptor> captor = ArgumentCaptor.forClass(ActionListener.class); + verify(ip2GeoLockService).acquireLock(eq(datasource.getName()), anyLong(), captor.capture()); + + if (exception == null) { + // Run + captor.getValue().onResponse(lockModel); + + // Verify + if (lockModel == null) { + verify(listener).onFailure(any(OpenSearchException.class)); + } else { + verify(listener).onResponse(new AcknowledgedResponse(true)); + verify(ip2GeoLockService).releaseLock(eq(lockModel), any(ActionListener.class)); + } + } else { + // Run + captor.getValue().onFailure(exception); + // Verify + verify(listener).onFailure(exception); + } + } + + @SneakyThrows + public void testDeleteDatasource_whenNull_thenThrowException() { + Datasource datasource = randomDatasource(); + expectThrows(ResourceNotFoundException.class, () -> action.deleteDatasource(datasource.getName())); + } + + @SneakyThrows + public void testDeleteDatasource_whenSafeToDelete_thenDelete() { + Datasource datasource = randomDatasource(); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + + // Run + action.deleteDatasource(datasource.getName()); + + // Verify + assertEquals(DatasourceState.DELETING, datasource.getState()); + verify(datasourceFacade).updateDatasource(datasource); + verify(datasourceFacade).deleteDatasource(datasource); + } + + @SneakyThrows + public void testDeleteDatasource_whenProcessorIsUsingDatasource_thenThrowException() { + Datasource datasource = randomDatasource(); + datasource.setState(DatasourceState.AVAILABLE); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + + String pipelineId = GeospatialTestHelper.randomLowerCaseString(); + Map pipelines = new HashMap<>(); + pipelines.put(pipelineId, createPipelineConfiguration()); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); + when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn( + Arrays.asList(createIp2GeoProcesor(datasource.getName())) + ); + + // Run + expectThrows(OpenSearchException.class, () -> action.deleteDatasource(datasource.getName())); + + // Verify + assertEquals(DatasourceState.AVAILABLE, datasource.getState()); + verify(datasourceFacade, never()).updateDatasource(datasource); + verify(datasourceFacade, never()).deleteDatasource(datasource); + } + + @SneakyThrows + public void testDeleteDatasource_whenProcessorIsCreatedDuringDeletion_thenThrowException() { + Datasource datasource = randomDatasource(); + datasource.setState(DatasourceState.AVAILABLE); + when(datasourceFacade.getDatasource(datasource.getName())).thenReturn(datasource); + + String pipelineId = GeospatialTestHelper.randomLowerCaseString(); + Map pipelines = new HashMap<>(); + pipelines.put(pipelineId, createPipelineConfiguration()); + IngestMetadata ingestMetadata = new IngestMetadata(pipelines); + when(metadata.custom(IngestMetadata.TYPE)).thenReturn(ingestMetadata); + when(ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class)).thenReturn( + Collections.emptyList(), + Arrays.asList(createIp2GeoProcesor(datasource.getName())) + ); + + // Run + expectThrows(OpenSearchException.class, () -> action.deleteDatasource(datasource.getName())); + + // Verify + verify(datasourceFacade, times(2)).updateDatasource(datasource); + verify(datasourceFacade, never()).deleteDatasource(datasource); + } + + private PipelineConfiguration createPipelineConfiguration() { + String id = GeospatialTestHelper.randomLowerCaseString(); + ByteBuffer byteBuffer = ByteBuffer.wrap(GeospatialTestHelper.randomLowerCaseString().getBytes(StandardCharsets.US_ASCII)); + BytesReference config = BytesReference.fromByteBuffer(byteBuffer); + return new PipelineConfiguration(id, config, XContentType.JSON); + } + + private Ip2GeoProcessor createIp2GeoProcesor(String datasourceName) { + String tag = GeospatialTestHelper.randomLowerCaseString(); + String description = GeospatialTestHelper.randomLowerCaseString(); + String field = GeospatialTestHelper.randomLowerCaseString(); + String targetField = GeospatialTestHelper.randomLowerCaseString(); + Set properties = Set.of(GeospatialTestHelper.randomLowerCaseString()); + Ip2GeoProcessor ip2GeoProcessor = new Ip2GeoProcessor( + tag, + description, + field, + targetField, + datasourceName, + properties, + true, + true, + clusterSettings, + datasourceFacade, + geoIpDataFacade + ); + return ip2GeoProcessor; + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandlerTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandlerTests.java new file mode 100644 index 00000000..937c5532 --- /dev/null +++ b/src/test/java/org/opensearch/geospatial/ip2geo/action/RestDeleteDatasourceHandlerTests.java @@ -0,0 +1,49 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.geospatial.ip2geo.action; + +import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER; +import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix; + +import java.util.Locale; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Before; +import org.opensearch.geospatial.GeospatialTestHelper; +import org.opensearch.rest.RestRequest; +import org.opensearch.test.rest.FakeRestRequest; +import org.opensearch.test.rest.RestActionTestCase; + +public class RestDeleteDatasourceHandlerTests extends RestActionTestCase { + private String path; + private RestDeleteDatasourceHandler action; + + @Before + public void setupAction() { + action = new RestDeleteDatasourceHandler(); + controller().registerHandler(action); + path = String.join(URL_DELIMITER, getPluginURLPrefix(), "ip2geo/datasource/%s"); + } + + public void testPrepareRequest_whenValidInput_thenSucceed() { + String datasourceName = GeospatialTestHelper.randomLowerCaseString(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.DELETE) + .withPath(String.format(Locale.ROOT, path, datasourceName)) + .build(); + AtomicBoolean isExecuted = new AtomicBoolean(false); + + verifyingClient.setExecuteLocallyVerifier((actionResponse, actionRequest) -> { + assertTrue(actionRequest instanceof DeleteDatasourceRequest); + DeleteDatasourceRequest deleteDatasourceRequest = (DeleteDatasourceRequest) actionRequest; + assertEquals(datasourceName, deleteDatasourceRequest.getName()); + isExecuted.set(true); + return null; + }); + + dispatchRequest(request); + assertTrue(isExecuted.get()); + } +} diff --git a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java index baacde79..d2ca28bb 100644 --- a/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java +++ b/src/test/java/org/opensearch/geospatial/ip2geo/common/DatasourceFacadeTests.java @@ -26,6 +26,9 @@ import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.delete.DeleteResponse; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.get.MultiGetItemResponse; @@ -34,7 +37,9 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; +import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.common.Randomness; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.xcontent.json.JsonXContent; @@ -45,6 +50,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.query.QueryBuilders; import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule; +import org.opensearch.rest.RestStatus; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; @@ -209,6 +215,38 @@ private Datasource setupClientForGetRequest(final boolean isExist, final Runtime return datasource; } + public void testDeleteDatasource_whenValidInput_thenSucceed() { + Datasource datasource = randomDatasource(); + verifyingClient.setExecuteVerifier( + (actionResponse, actionRequest) -> { + // Verify + if (actionRequest instanceof DeleteIndexRequest) { + DeleteIndexRequest request = (DeleteIndexRequest) actionRequest; + assertEquals(datasource.getIndices().size(), request.indices().length); + assertEquals(IndicesOptions.LENIENT_EXPAND_OPEN, request.indicesOptions()); + + AcknowledgedResponse response = new AcknowledgedResponse(true); + return response; + } else if (actionRequest instanceof DeleteRequest) { + DeleteRequest request = (DeleteRequest) actionRequest; + assertEquals(DatasourceExtension.JOB_INDEX_NAME, request.index()); + assertEquals(DocWriteRequest.OpType.DELETE, request.opType()); + assertEquals(datasource.getName(), request.id()); + assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, request.getRefreshPolicy()); + + DeleteResponse response = mock(DeleteResponse.class); + when(response.status()).thenReturn(RestStatus.OK); + return response; + } else { + throw new RuntimeException("Not expected request type is passed" + actionRequest.getClass()); + } + } + ); + + // Run + datasourceFacade.deleteDatasource(datasource); + } + public void testGetDatasources_whenValidInput_thenSucceed() { List datasources = Arrays.asList(randomDatasource(), randomDatasource()); String[] names = datasources.stream().map(Datasource::getName).toArray(String[]::new); diff --git a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java index 3917e7c7..b3050f60 100644 --- a/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java +++ b/src/test/java/org/opensearch/geospatial/plugin/GeospatialPluginTests.java @@ -32,6 +32,7 @@ import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.geospatial.action.upload.geojson.UploadGeoJSONAction; +import org.opensearch.geospatial.ip2geo.action.RestDeleteDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestGetDatasourceHandler; import org.opensearch.geospatial.ip2geo.action.RestPutDatasourceHandler; import org.opensearch.geospatial.ip2geo.common.DatasourceFacade; @@ -61,7 +62,8 @@ public class GeospatialPluginTests extends OpenSearchTestCase { new RestUploadGeoJSONAction(), new RestUploadStatsAction(), new RestPutDatasourceHandler(clusterSettings), - new RestGetDatasourceHandler() + new RestGetDatasourceHandler(), + new RestDeleteDatasourceHandler() ); private final Set SUPPORTED_SYSTEM_INDEX_PATTERN = Set.of(IP2GEO_DATA_INDEX_NAME_PREFIX);