Skip to content

Commit

Permalink
Retry point in time on other copy when possible (#66713)
Browse files Browse the repository at this point in the history
Relates #61062
  • Loading branch information
dnhatn authored Jan 9, 2021
1 parent fc57255 commit 59082c0
Show file tree
Hide file tree
Showing 20 changed files with 445 additions and 81 deletions.
4 changes: 2 additions & 2 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ POST /_search <1>
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"keep_alive": "1m" <3>
}
}
Expand Down Expand Up @@ -99,7 +99,7 @@ as soon as they are no longer used in search requests.
---------------------------------------
DELETE /_pit
{
"id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
"id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
---------------------------------------
// TEST[catch:missing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ The API returns a PIT ID.
[source,console-result]
----
{
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
----
// TESTRESPONSE[s/"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]

To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
Expand All @@ -86,7 +86,7 @@ GET /_search
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
Expand All @@ -106,7 +106,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.
[source,console-result]
----
{
"pit_id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1>
"pit_id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"took" : 17,
"timed_out" : false,
"_shards" : ...,
Expand Down Expand Up @@ -150,7 +150,7 @@ GET /_search
}
},
"pit": {
"id": "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA==", <1>
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [
Expand Down Expand Up @@ -178,7 +178,7 @@ When you're finished, you should delete your PIT.
----
DELETE /_pit
{
"id" : "46ToAwEPbXktaW5kZXgtMDAwMDAxFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAFldicVdzOFFtVHZTZDFoWWowTGkwS0EAAAAAAAAAAAQURzZzcUszUUJ5U1NMX3Jyak5ET0wBFnVzaTVuenpUVGQ2TFNheUxVUG5LVVEAAA=="
"id" : "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
}
----
// TEST[catch:missing]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.transport.Transport;
Expand Down Expand Up @@ -478,7 +481,7 @@ public final void onShardFailure(final int shardIndex, SearchShardTarget shardTa
} else {
// the failure is already present, try and not override it with an exception that is less meaningless
// for example, getting illegal shard state
if (TransportActions.isReadOverrideException(e)) {
if (TransportActions.isReadOverrideException(e) && (e instanceof SearchContextMissingException == false)) {
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
}
}
Expand Down Expand Up @@ -567,6 +570,16 @@ public final SearchRequest getRequest() {
return request;
}

@Override
public boolean isPartOfPointInTime(ShardSearchContextId contextId) {
final PointInTimeBuilder pointInTimeBuilder = request.pointInTimeBuilder();
if (pointInTimeBuilder != null) {
return request.pointInTimeBuilder().getSearchContextId(searchTransportService.getNamedWriteableRegistry()).contains(contextId);
} else {
return false;
}
}

protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures,
String scrollId, String searchContextId) {
int numSuccess = successfulOps.get();
Expand Down Expand Up @@ -598,7 +611,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion);
} else {
if (request.source() != null && request.source().pointInTimeBuilder() != null) {
searchContextId = request.source().pointInTimeBuilder().getId();
searchContextId = request.source().pointInTimeBuilder().getEncodedId();
} else {
searchContextId = null;
}
Expand All @@ -619,21 +632,19 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause)
* @param exception the exception explaining or causing the phase failure
*/
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
// we don't release persistent readers (point in time).
if (request.pointInTimeBuilder() == null) {
results.getSuccessfulResults().forEach((entry) -> {
if (entry.getContextId() != null) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
}
results.getSuccessfulResults().forEach((entry) -> {
// Do not release search contexts that are part of the point in time
if (entry.getContextId() != null && isPartOfPointInTime(entry.getContextId()) == false) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
}
});
}
}
});
Releasables.close(releasables);
listener.onFailure(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void onFailure(Exception exception) {
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
if (context.getRequest().pointInTimeBuilder() == null) {
if (context.isPartOfPointInTime(querySearchRequest.contextId()) == false) {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// or using a PIT and if it has at least one hit that didn't make it to the global topDocs
if (queryResult.hasSearchContext()
&& context.getRequest().scroll() == null
&& context.getRequest().pointInTimeBuilder() == null) {
&& (context.isPartOfPointInTime(queryResult.getContextId()) == false)) {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.transport.RemoteClusterAware;

import java.io.IOException;
Expand All @@ -43,14 +44,17 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class SearchContextId {
public final class SearchContextId {
private final Map<ShardId, SearchContextIdForNode> shards;
private final Map<String, AliasFilter> aliasFilter;
private transient Set<ShardSearchContextId> contextIds;

private SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
SearchContextId(Map<ShardId, SearchContextIdForNode> shards, Map<String, AliasFilter> aliasFilter) {
this.shards = shards;
this.aliasFilter = aliasFilter;
this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet());
}

public Map<ShardId, SearchContextIdForNode> shards() {
Expand All @@ -61,6 +65,10 @@ public Map<String, AliasFilter> aliasFilter() {
return aliasFilter;
}

public boolean contains(ShardSearchContextId contextId) {
return contextIds.contains(contextId);
}

public static String encode(List<SearchPhaseResult> searchPhaseResults, Map<String, AliasFilter> aliasFilter, Version version) {
final Map<ShardId, SearchContextIdForNode> shards = new HashMap<>();
for (SearchPhaseResult searchPhaseResult : searchPhaseResults) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ interface SearchPhaseContext extends Executor {
*/
SearchRequest getRequest();

/**
* Checks if the given context id is part of the point in time of this search (if exists).
* We should not release search contexts that belong to the point in time during or after searches.
*/
boolean isPartOfPointInTime(ShardSearchContextId contextId);

/**
* Builds and sends the final search response back to the user.
*
Expand Down Expand Up @@ -108,6 +114,7 @@ interface SearchPhaseContext extends Executor {
default void sendReleaseSearchContext(ShardSearchContextId contextId,
Transport.Connection connection,
OriginalIndices originalIndices) {
assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
if (connection != null) {
getSearchTransport().sendFreeContext(connection, contextId, originalIndices);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -438,4 +439,8 @@ public void cancelSearchTask(SearchTask task, String reason) {
// force the origin to execute the cancellation as a system user
new OriginSettingClient(client, GetTaskAction.TASKS_ORIGIN).admin().cluster().cancelTasks(req, ActionListener.wrap(() -> {}));
}

public NamedWriteableRegistry getNamedWriteableRegistry() {
return client.getNamedWriteableRegistry();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -283,7 +284,7 @@ private void executeRequest(Task task, SearchRequest searchRequest,
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId());
searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
} else {
searchContext = null;
Expand Down Expand Up @@ -580,7 +581,15 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(Map<Stri
final String clusterAlias = entry.getKey();
final SearchContextIdForNode perNode = searchContextId.shards().get(shardId);
assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias();
final List<String> targetNodes = List.of(perNode.getNode());
final List<String> targetNodes = new ArrayList<>(group.getShards().length);
targetNodes.add(perNode.getNode());
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : group.getShards()) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
SearchShardIterator shardIterator = new SearchShardIterator(clusterAlias, shardId, targetNodes,
remoteClusterIndices.get(clusterAlias), perNode.getSearchContextId(), searchContextKeepAlive);
remoteShardIterators.add(shardIterator);
Expand Down Expand Up @@ -914,8 +923,16 @@ static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(Clus
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = List.of(perNode.getNode());
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = new ArrayList<>(shards.size());
targetNodes.add(perNode.getNode());
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
iterators.add(new SearchShardIterator(localClusterAlias, shardId, targetNodes, originalIndices,
perNode.getSearchContextId(), keepAlive));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ static void preparePointInTime(SearchRequest request, RestRequest restRequest, N
indicesOptions.ignoreUnavailable(), indicesOptions.allowNoIndices(), false, false, false,
true, true, indicesOptions.ignoreThrottled());
request.indicesOptions(stricterIndicesOptions);
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.pointInTimeBuilder().getId());
final SearchContextId searchContextId = request.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
request.indices(searchContextId.getActualIndices());
}

Expand Down
Loading

0 comments on commit 59082c0

Please sign in to comment.