Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rest layer changes for List all PITs and PIT segments #4388

Merged
merged 18 commits into from
Sep 25, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
changes as per new security model
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Sep 20, 2022
commit bb950d39498eb0c849c416425dbc63544f835b85
Original file line number Diff line number Diff line change
@@ -238,16 +238,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.NodesGetAllPitsAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
@@ -677,10 +675,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);
@@ -859,7 +856,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction(nodesInCluster));
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));

Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
package org.opensearch.action.admin.indices.segments;

import org.opensearch.action.ActionListener;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitService;
import org.opensearch.action.search.SearchContextId;
import org.opensearch.action.search.SearchContextIdForNode;
@@ -45,6 +46,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.opensearch.action.search.SearchContextId.decode;

@@ -93,10 +95,11 @@ public TransportPitSegmentsAction(
@Override
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
List<String> pitIds = request.getPitIds();
// when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty
// and in this case return empty response
if (pitIds.isEmpty()) {
listener.onResponse(new IndicesSegmentResponse(new ShardSegments[] {}, 0, 0, 0, new ArrayList<>()));
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
pitService.getAllPits(ActionListener.wrap(response -> {
request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
super.doExecute(task, request, listener);
}, listener::onFailure));
} else {
super.doExecute(task, request, listener);
}
Original file line number Diff line number Diff line change
@@ -21,22 +21,11 @@
*/
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {

// Security plugin intercepts and sets the response with permitted PIT contexts
private GetAllPitNodesResponse getAllPitNodesResponse;

@Inject
public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) {
super(concreteNodes);
}

public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) {
this.getAllPitNodesResponse = getAllPitNodesResponse;
}

public GetAllPitNodesResponse getGetAllPitNodesResponse() {
return getAllPitNodesResponse;
}

public GetAllPitNodesRequest(StreamInput in) throws IOException {
super(in);
}
Original file line number Diff line number Diff line change
@@ -68,10 +68,10 @@ public GetAllPitNodesResponse(List<ListPitInfo> listPitInfos, GetAllPitNodesResp
public GetAllPitNodesResponse(
List<ListPitInfo> listPitInfos,
ClusterName clusterName,
List<GetAllPitNodeResponse> getAllPitNodeResponse,
List<GetAllPitNodeResponse> getAllPitNodeResponseList,
List<FailedNodeException> failures
) {
super(clusterName, getAllPitNodeResponse, failures);
super(clusterName, getAllPitNodeResponseList, failures);
pitInfos.addAll(listPitInfos);
}

Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
import org.opensearch.action.ActionType;

/**
* Action type for listing all PIT reader contexts
* Action type for retrieving all PIT reader contexts from nodes
*/
public class GetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
public static final GetAllPitsAction INSTANCE = new GetAllPitsAction();

This file was deleted.

Original file line number Diff line number Diff line change
@@ -170,16 +170,14 @@ public Map<String, String[]> getIndicesForPits(List<String> pitIds) {
/**
* Get all active point in time contexts
*/
public void getAllPits(GetAllPitNodesResponse getAllPitNodesResponse, ActionListener<GetAllPitNodesResponse> getAllPitsListener) {
public void getAllPits(ActionListener<GetAllPitNodesResponse> getAllPitsListener) {
final List<DiscoveryNode> nodes = new ArrayList<>();
for (ObjectCursor<DiscoveryNode> cursor : clusterService.state().nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
nodes.add(node);
}
logger.debug("Number of active PITs in cluster: " + getAllPitNodesResponse.getPitInfos().size());
DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]);
GetAllPitNodesRequest getAllPitNodesRequest = new GetAllPitNodesRequest(disNodesArr);
getAllPitNodesRequest.setGetAllPitNodesResponse(getAllPitNodesResponse);
transportService.sendRequest(
transportService.getLocalNode(),
GetAllPitsAction.NAME,
Original file line number Diff line number Diff line change
@@ -11,7 +11,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.tasks.Task;
@@ -21,6 +20,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Transport action for deleting point in time searches - supports deleting list and all point in time searches
@@ -34,9 +34,6 @@ public TransportDeletePitAction(
TransportService transportService,
ActionFilters actionFilters,
NamedWriteableRegistry namedWriteableRegistry,
TransportSearchAction transportSearchAction,
ClusterService clusterService,
SearchTransportService searchTransportService,
PitService pitService
) {
super(DeletePitAction.NAME, transportService, actionFilters, DeletePitRequest::new);
@@ -50,10 +47,8 @@ public TransportDeletePitAction(
@Override
protected void doExecute(Task task, DeletePitRequest request, ActionListener<DeletePitResponse> listener) {
List<String> pitIds = request.getPitIds();
// when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty
// and in this case return empty response
if (pitIds.isEmpty()) {
listener.onResponse(new DeletePitResponse(new ArrayList<>()));
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
deleteAllPits(listener);
} else {
deletePits(listener, request);
}
@@ -75,4 +70,24 @@ private void deletePits(ActionListener<DeletePitResponse> listener, DeletePitReq
}
pitService.deletePitContexts(nodeToContextsMap, listener);
}

/**
* Delete all active PIT reader contexts leveraging list all PITs
*
* For Cross cluster PITs :
* - mixed cluster PITs ( PIT comprising local and remote ) will be fully deleted. Since there will atleast be
* one reader context with PIT ID present in local cluster, 'Get all PITs' will retrieve the PIT ID with which
* we can completely delete the PIT contexts in both local and remote cluster.
* - fully remote PITs will not be deleted as 'Get all PITs' operates on local cluster only and no PIT info can
* be retrieved when it's fully remote.
*/
private void deleteAllPits(ActionListener<DeletePitResponse> listener) {
// Get all PITs and execute delete operation for the PITs.
pitService.getAllPits(ActionListener.wrap(getAllPitNodesResponse -> {
DeletePitRequest deletePitRequest = new DeletePitRequest(
getAllPitNodesResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList())
);
deletePits(listener, deletePitRequest);
}, listener::onFailure));
}
}
Original file line number Diff line number Diff line change
@@ -8,29 +8,79 @@

package org.opensearch.action.search;

import org.opensearch.action.ActionListener;
import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.tasks.Task;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.search.SearchService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;

/**
* Transport action to get all active PIT contexts across the cluster
* Transport action to get all active PIT contexts across all nodes
*/
public class TransportGetAllPitsAction extends HandledTransportAction<GetAllPitNodesRequest, GetAllPitNodesResponse> {
private final PitService pitService;
public class TransportGetAllPitsAction extends TransportNodesAction<
GetAllPitNodesRequest,
GetAllPitNodesResponse,
GetAllPitNodeRequest,
GetAllPitNodeResponse> {
private final SearchService searchService;

@Inject
public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) {
super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in));
this.pitService = pitService;
public TransportGetAllPitsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
ActionFilters actionFilters,
SearchService searchService
) {
super(
GetAllPitsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
GetAllPitNodesRequest::new,
GetAllPitNodeRequest::new,
ThreadPool.Names.SAME,
GetAllPitNodeResponse.class
);
this.searchService = searchService;
}

@Override
protected GetAllPitNodesResponse newResponse(
GetAllPitNodesRequest request,
List<GetAllPitNodeResponse> getAllPitNodeResponses,
List<FailedNodeException> failures
) {
return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeResponses, failures);
}

@Override
protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) {
return new GetAllPitNodeRequest();
}

@Override
protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException {
return new GetAllPitNodeResponse(in);
}

protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener<GetAllPitNodesResponse> listener) {
// If security plugin intercepts the request, it'll replace all PIT IDs with permitted PIT IDs
if (request.getGetAllPitNodesResponse() != null) {
listener.onResponse(request.getGetAllPitNodesResponse());
}
/**
* This retrieves all active PITs in the node
*/
@Override
protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) {
GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse(
transportService.getLocalNode(),
searchService.getAllPITReaderContexts()
);
return nodeResponse;
}
}
Loading