Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements delete datasource API #291

Merged
merged 1 commit into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.master.AcknowledgedResponse;

/**
* Ip2Geo datasource delete action
*/
public class DeleteDatasourceAction extends ActionType<AcknowledgedResponse> {
/**
* Delete datasource action instance
*/
public static final DeleteDatasourceAction INSTANCE = new DeleteDatasourceAction();
/**
* Delete datasource action name
*/
public static final String NAME = "cluster:admin/geospatial/datasource/delete";

private DeleteDatasourceAction() {
super(NAME, AcknowledgedResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;

/**
* GeoIP datasource delete request
*/
@Getter
@Setter
@AllArgsConstructor
public class DeleteDatasourceRequest extends ActionRequest {
/**
* @param name the datasource name
* @return the datasource name
*/
private String name;

/**
* Constructor
*
* @param in the stream input
* @throws IOException IOException
*/
public DeleteDatasourceRequest(final StreamInput in) throws IOException {
super(in);
this.name = in.readString();
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException errors = null;
if (name == null || name.isBlank()) {
errors = new ActionRequestValidationException();
errors.addValidationError("Datasource name should not be empty");
}
return errors;
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import java.io.IOException;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.common.inject.Inject;
import org.opensearch.geospatial.annotation.VisibleForTesting;
import org.opensearch.geospatial.ip2geo.common.DatasourceFacade;
import org.opensearch.geospatial.ip2geo.common.DatasourceState;
import org.opensearch.geospatial.ip2geo.common.Ip2GeoLockService;
import org.opensearch.geospatial.ip2geo.jobscheduler.Datasource;
import org.opensearch.geospatial.ip2geo.processor.Ip2GeoProcessor;
import org.opensearch.ingest.IngestMetadata;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport action to delete datasource
*/
@Log4j2
public class DeleteDatasourceTransportAction extends HandledTransportAction<DeleteDatasourceRequest, AcknowledgedResponse> {
private static final long LOCK_DURATION_IN_SECONDS = 300l;
private final Ip2GeoLockService lockService;
private final IngestService ingestService;
private final DatasourceFacade datasourceFacade;

/**
* Constructor
* @param transportService the transport service
* @param actionFilters the action filters
* @param lockService the lock service
* @param ingestService the ingest service
* @param datasourceFacade the datasource facade
*/
@Inject
public DeleteDatasourceTransportAction(
final TransportService transportService,
final ActionFilters actionFilters,
final Ip2GeoLockService lockService,
final IngestService ingestService,
final DatasourceFacade datasourceFacade
) {
super(DeleteDatasourceAction.NAME, transportService, actionFilters, DeleteDatasourceRequest::new);
this.lockService = lockService;
this.ingestService = ingestService;
this.datasourceFacade = datasourceFacade;
}

/**
* We delete datasource regardless of its state as long as we can acquire a lock
*
* @param task the task
* @param request the request
* @param listener the listener
*/
@Override
protected void doExecute(final Task task, final DeleteDatasourceRequest request, final ActionListener<AcknowledgedResponse> listener) {
lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> {
if (lock == null) {
listener.onFailure(new OpenSearchException("another processor is holding a lock on the resource. Try again later"));
return;
}
try {
deleteDatasource(request.getName());
jmazanec15 marked this conversation as resolved.
Show resolved Hide resolved
listener.onResponse(new AcknowledgedResponse(true));
} catch (Exception e) {
listener.onFailure(e);
} finally {
lockService.releaseLock(
lock,
ActionListener.wrap(
released -> { log.info("Released lock for datasource[{}]", request.getName()); },
exception -> { log.error("Failed to release the lock", exception); }
)
);
}
}, exception -> { listener.onFailure(exception); }));
}

@VisibleForTesting
protected void deleteDatasource(final String datasourceName) throws IOException {
Datasource datasource = datasourceFacade.getDatasource(datasourceName);
if (datasource == null) {
throw new ResourceNotFoundException("no such datasource exist");
}

setDatasourceStateAsDeleting(datasource);
datasourceFacade.deleteDatasource(datasource);
}

private void setDatasourceStateAsDeleting(final Datasource datasource) throws IOException {
if (isSafeToDelete(datasource) == false) {
throw new OpenSearchException("datasource is being used by one of processors");
}

DatasourceState previousState = datasource.getState();
datasource.setState(DatasourceState.DELETING);
datasourceFacade.updateDatasource(datasource);

// Check again as processor might just have been created.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there not a write lock that can be grabbed to delete the datasource?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't implemented lock between create ip2geo processor and delete datasource.
Appreciate if you could share some idea on the write lock. If write lock involves interacting with index, it cannot be used during ip2geo processor creation phase because processor creation is a process which changes cluster state and while changing cluster state, we cannot call some or all of OpenSearch API like getting or searching document from an index.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm yes Im not sure what the lock would be. I would think there would be something with the JobScheduler LockService.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock with job scheduler utilize opensearch index which cannot be used in processor creation time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like utilizing cluster state might be the best way to avoid this.

In k-NN, we do something similar with the ModelGraveyard: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/indices/ModelGraveyard.java. Basically, we add model IDs that are being deleted to a "ModelGraveyard" that is stored in the cluster state.

ref: opensearch-project/k-NN#424. @naveentatikonda might be a good person to talk to more about this - he implemented it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The thing is, we didn't want to add any custom metadata during design review meeting as it can cause a headache in the future due to many restriction on maintaining backward compatibility.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, might be something to consider in the future if this becomes a problem.

// If it fails to update the state back to the previous state, the new processor
// will fail to convert an ip to a geo data.
// In such case, user have to delete the processor and delete this datasource again.
if (isSafeToDelete(datasource) == false) {
datasource.setState(previousState);
datasourceFacade.updateDatasource(datasource);
throw new OpenSearchException("datasource is being used by one of processors");
}
}

private boolean isSafeToDelete(Datasource datasource) {
IngestMetadata ingestMetadata = ingestService.getClusterService().state().getMetadata().custom(IngestMetadata.TYPE);
return ingestMetadata.getPipelines()
.keySet()
.stream()
.flatMap(pipelineId -> ingestService.getProcessorsInPipeline(pipelineId, Ip2GeoProcessor.class).stream())
.filter(ip2GeoProcessor -> ip2GeoProcessor.getDatasourceName().equals(datasource.getName()))
.findAny()
.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class GetDatasourceAction extends ActionType<GetDatasourceResponse> {
*/
public static final GetDatasourceAction INSTANCE = new GetDatasourceAction();
/**
* Name of a get datasource action
* Get datasource action name
*/
public static final String NAME = "cluster:admin/geospatial/datasource/get";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.geospatial.ip2geo.action;

import static org.opensearch.geospatial.shared.URLBuilder.URL_DELIMITER;
import static org.opensearch.geospatial.shared.URLBuilder.getPluginURLPrefix;
import static org.opensearch.rest.RestRequest.Method.DELETE;

import java.util.List;
import java.util.Locale;

import org.opensearch.client.node.NodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

/**
* Rest handler for Ip2Geo datasource delete request
*/
public class RestDeleteDatasourceHandler extends BaseRestHandler {
private static final String ACTION_NAME = "ip2geo_datasource_delete";
private static final String PARAMS_NAME = "name";

@Override
public String getName() {
return ACTION_NAME;
}

@Override
protected RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) {
final String name = request.param(PARAMS_NAME);
final DeleteDatasourceRequest deleteDatasourceRequest = new DeleteDatasourceRequest(name);

return channel -> client.executeLocally(
DeleteDatasourceAction.INSTANCE,
deleteDatasourceRequest,
new RestToXContentListener<>(channel)
);
}

@Override
public List<Route> routes() {
String path = String.join(URL_DELIMITER, getPluginURLPrefix(), String.format(Locale.ROOT, "ip2geo/datasource/{%s}", PARAMS_NAME));
return List.of(new Route(DELETE, path));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
import java.util.Objects;
import java.util.stream.Collectors;

import javax.swing.*;

import lombok.extern.log4j.Log4j2;

import org.opensearch.OpenSearchException;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.StepListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.get.MultiGetItemResponse;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -52,6 +53,7 @@
import org.opensearch.geospatial.shared.StashedThreadContext;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.rest.RestStatus;
import org.opensearch.search.SearchHit;

/**
Expand Down Expand Up @@ -164,6 +166,38 @@ public void putDatasource(final Datasource datasource, final ActionListener list
});
}

/**
* Delete datasource in an index {@code DatasourceExtension.JOB_INDEX_NAME}
*
* @param datasource the datasource
*
*/
public void deleteDatasource(final Datasource datasource) {
if (client.admin()
.indices()
.prepareDelete(datasource.getIndices().toArray(new String[0]))
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT))
.isAcknowledged() == false) {
throw new OpenSearchException("failed to delete data[{}] in datasource", String.join(",", datasource.getIndices()));
}
DeleteResponse response = client.prepareDelete()
.setIndex(DatasourceExtension.JOB_INDEX_NAME)
.setId(datasource.getName())
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.execute()
.actionGet(clusterSettings.get(Ip2GeoSettings.TIMEOUT));

if (response.status().equals(RestStatus.OK)) {
log.info("deleted datasource[{}] successfully", datasource.getName());
} else if (response.status().equals(RestStatus.NOT_FOUND)) {
throw new ResourceNotFoundException("datasource[{}] does not exist", datasource.getName());
} else {
throw new OpenSearchException("failed to delete datasource[{}] with status[{}]", datasource.getName(), response.status());
}
heemin32 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Get datasource from an index {@code DatasourceExtension.JOB_INDEX_NAME}
* @param name the name of a datasource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.ActionListener;
Expand All @@ -44,6 +45,10 @@ public final class Ip2GeoProcessor extends AbstractProcessor {
private static final String PROPERTY_IP = "ip";
private final String field;
private final String targetField;
/**
* @return The datasource name
*/
@Getter
private final String datasourceName;
private final Set<String> properties;
private final boolean ignoreMissing;
Expand Down
Loading