Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create PIT API #2745

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
469738f
create pit changes
bharath-techie Mar 24, 2022
0d1f337
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie Mar 29, 2022
751c320
two phase create pit
bharath-techie Mar 30, 2022
eba05e5
Adding unit tests
bharath-techie Apr 6, 2022
130a597
addressing review comments
bharath-techie Apr 9, 2022
c6f9398
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie Apr 12, 2022
87ec605
fixing ci, adding tests and java docs
bharath-techie Apr 12, 2022
f8f6367
Segregating create pit logic into separate controller
bharath-techie Apr 13, 2022
0fcd1df
Adding cleanup logic if create pit fails
bharath-techie Apr 14, 2022
3002c20
Merge branch 'opensearch-project:main' into createpit
bharath-techie Apr 22, 2022
1f6f466
Addressing comments
bharath-techie Apr 26, 2022
de5c4e4
Merge branch 'createpit' of github.com:bharath-techie/OpenSearch into…
bharath-techie Apr 26, 2022
3359986
Addressing review comments
bharath-techie Apr 29, 2022
232179e
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie May 2, 2022
730afa8
Adding java docs
bharath-techie May 3, 2022
4747018
Addressing comments and making pit naming uniform
bharath-techie May 6, 2022
7340feb
Changes to include rest high level client tests and addressing comments
bharath-techie May 9, 2022
eec2480
Addressing comments
bharath-techie May 10, 2022
b87f0fd
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie May 11, 2022
939fafc
Addressing comments
bharath-techie May 12, 2022
02c7537
Addressing comments
bharath-techie May 12, 2022
c6cef2b
Addressing comments
bharath-techie May 12, 2022
bd0105c
addressing comments
bharath-techie May 12, 2022
35d8cc4
addressing comments
bharath-techie May 13, 2022
11c5195
addressing comments
bharath-techie May 16, 2022
471a64a
addressing comments
bharath-techie May 17, 2022
47bb3e7
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie May 17, 2022
c28762e
fixing broken tests
bharath-techie May 17, 2022
f272405
Merge branch 'main' of https://github.com/opensearch-project/OpenSear…
bharath-techie May 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,12 @@
import org.opensearch.action.main.MainAction;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePITAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePITAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -400,6 +402,7 @@
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePITAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
Expand Down Expand Up @@ -660,6 +663,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(ImportDanglingIndexAction.INSTANCE, TransportImportDanglingIndexAction.class);
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);
actions.register(CreatePITAction.INSTANCE, TransportCreatePITAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -832,6 +836,9 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestRepositoriesAction());
registerHandler.accept(new RestSnapshotAction());
registerHandler.accept(new RestTemplatesAction());

// Point in time API
registerHandler.accept(new RestCreatePITAction());
for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
public class CreatePITAction extends ActionType<CreatePITResponse> {
public static final CreatePITAction INSTANCE = new CreatePITAction();
public static final String NAME = "indices:data/write/pit";

private CreatePITAction() {
super(NAME, CreatePITResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Controller for creating PIT reader context
* Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive
* Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and
* fail user request if any of the updates in this phase are failed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also mention that we are deleting a pit in the partial failure scenario

*/
public class CreatePITController implements Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can all 3 - create, delete, list PIT transport layers invoke the same "PITController" class rather than having 3 separate classes(I presume that's what was planned based on the naming of this class).

All PIT crud operation logic could be handled from here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made the changes to have all together in single controller but found that nothing is reusable across APIs. The request, listeners etc are different and accepts different params and gives different responses. So reverted it back to having different controllers for different flows.

private final Runnable runner;
private final SearchTransportService searchTransportService;
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final Task task;
private final ActionListener<CreatePITResponse> listener;
private final CreatePITRequest request;
private static final Logger logger = LogManager.getLogger(CreatePITController.class);

public CreatePITController(
CreatePITRequest request,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
Task task,
ActionListener<CreatePITResponse> listener
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.task = task;
this.listener = listener;
this.request = request;
runner = this::executeCreatePit;
}

private TimeValue getCreatePitTemporaryKeepAlive() {
return SearchService.CREATE_PIT_TEMPORARY_KEEPALIVE_SETTING.get(clusterService.getSettings());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this setting in SearchService? Shouldn't this setting be in TransportCreatePITAction?

}

public void executeCreatePit() {
SearchRequest searchRequest = new SearchRequest(request.getIndices());
searchRequest.preference(request.getPreference());
searchRequest.routing(request.getRouting());
searchRequest.indicesOptions(request.getIndicesOptions());
searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation());

SearchTask searchTask = new SearchTask(
task.getId(),
task.getType(),
task.getAction(),
() -> task.getDescription(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have a proper description detailing why this search is being executed
?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

task description already contains pit details which is relevant to this operation , do we need more info ?

task.getParentTaskId(),
task.getHeaders()
);

final StepListener<CreatePITResponse> createPitListener = new StepListener<>();

final ActionListener<CreatePITResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
logger.error("PIT creation failed while updating PIT ID", e);
listener.onFailure(e);
});
/**
* Phase 1 of create PIT
*/
executeCreatePit(searchTask, searchRequest, createPitListener);

/**
* Phase 2 of create PIT where we update pit id in pit contexts
*/
executeUpdatePitId(request, createPitListener, updatePitIdListener);
}

/**
* Creates PIT reader context with temporary keep alive
*/
public void executeCreatePit(Task task, SearchRequest searchRequest, StepListener<CreatePITResponse> createPitListener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

package-private? We just need for functional testing right?

transportSearchAction.executeRequest(
task,
searchRequest,
TransportCreatePITAction.CREATE_PIT_ACTION,
true,
new TransportSearchAction.SinglePhaseSearchAction() {
@Override
public void executeOnShardTarget(
SearchTask searchTask,
SearchShardTarget target,
Transport.Connection connection,
ActionListener<SearchPhaseResult> searchPhaseResultActionListener
) {
searchTransportService.createPitContext(
connection,
new TransportCreatePITAction.CreateReaderContextRequest(target.getShardId(), getCreatePitTemporaryKeepAlive()),
searchTask,
ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure)
);
}
},
createPitListener
);
}

/**
* Updates PIT ID, keep alive and createdTime of PIT reader context
*/
public void executeUpdatePitId(
CreatePITRequest request,
StepListener<CreatePITResponse> createPitListener,
ActionListener<CreatePITResponse> updatePitIdListener
) {
createPitListener.whenComplete(createPITResponse -> {
SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getConnectionLookupListener(contextId);
lookupListener.whenComplete(nodelookup -> {
final ActionListener<UpdatePitContextResponse> groupedActionListener = getGroupedListener(
updatePitIdListener,
createPITResponse,
contextId.shards().size(),
contextId.shards().values()
);
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
long createTime = System.currentTimeMillis();
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
try {
final Transport.Connection connection = searchTransportService.getConnection(
entry.getValue().getClusterAlias(),
node
);
searchTransportService.updatePitContext(
connection,
new UpdatePITContextRequest(
entry.getValue().getSearchContextId(),
createPITResponse.getId(),
request.getKeepAlive().millis(),
createTime
),
groupedActionListener
);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Create pit update phase failed on node [{}]", node), e);
groupedActionListener.onFailure(new OpenSearchException("Create pit failed on node[" + node + "]", e));
}
}
}, updatePitIdListener::onFailure);
}, updatePitIdListener::onFailure);
}

private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(SearchContextId contextId) {
ClusterState state = clusterService.state();

final Set<String> clusters = contextId.shards()
.values()
.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());

final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();

if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId));
}
return lookupListener;
}

private ActionListener<UpdatePitContextResponse> getGroupedListener(
ActionListener<CreatePITResponse> updatePitIdListener,
CreatePITResponse createPITResponse,
int size,
Collection<SearchContextIdForNode> contexts
) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(final Collection<UpdatePitContextResponse> responses) {
updatePitIdListener.onResponse(createPITResponse);
}

@Override
public void onFailure(final Exception e) {
cleanupContexts(contexts);
updatePitIdListener.onFailure(e);
}
}, size);
}

/**
* Cleanup all created PIT contexts in case of failure
*/
private void cleanupContexts(Collection<SearchContextIdForNode> contexts) {
ActionListener<Integer> deleteListener = new ActionListener<>() {
@Override
public void onResponse(Integer freed) {
// log the number of freed contexts - this is invoke and forget call
logger.debug(() -> new ParameterizedMessage("Cleaned up {} contexts out of {}", freed, contexts.size()));
}

@Override
public void onFailure(Exception e) {
logger.debug("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
}

@Override
public void run() {
runner.run();
}
}
Loading