Skip to content

Commit

Permalink
Adding support for allow_partial_search_results in PIT (#111516)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmpailis authored Aug 26, 2024
1 parent 6b96226 commit 785fe53
Show file tree
Hide file tree
Showing 24 changed files with 530 additions and 93 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111516.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111516
summary: Adding support for `allow_partial_search_results` in PIT
area: Search
type: enhancement
issues: []
38 changes: 38 additions & 0 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,44 @@ IMPORTANT: The open point in time request and each subsequent search request can
return different `id`; thus always use the most recently received `id` for the
next search request.

In addition to the `keep_alive` parameter, the `allow_partial_search_results` parameter
can also be defined.
This parameter determines whether the <<point-in-time-api, point in time (PIT)>>
should tolerate unavailable shards or <<shard-failures, shard failures>> when
initially creating the PIT.
If set to true, the PIT will be created with the available shards, along with a
reference to any missing ones.
If set to false, the operation will fail if any shard is unavailable.
The default value is false.

The PIT response includes a summary of the total number of shards, as well as the number
of successful shards when creating the PIT.

[source,console]
--------------------------------------------------
POST /my-index-000001/_pit?keep_alive=1m&allow_partial_search_results=true
--------------------------------------------------
// TEST[setup:my_index]

[source,js]
--------------------------------------------------
{
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=",
"_shards": {
"total": 10,
"successful": 10,
"skipped": 0,
"failed": 0
}
}
--------------------------------------------------
// NOTCONSOLE

When a PIT that contains shard failures is used in a search request, the missing are
always reported in the search response as a NoShardAvailableActionException exception.
To get rid of these exceptions, a new PIT needs to be created so that shards missing
from the previous PIT can be handled, assuming they become available in the meantime.

[[point-in-time-keep-alive]]
==== Keeping point in time alive
The `keep_alive` parameter, which is passed to a open point in time request and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ The search response includes an array of `sort` values for each hit:
"_id" : "654322",
"_score" : null,
"_source" : ...,
"sort" : [
"sort" : [
1463538855,
"654322"
"654322"
]
},
{
Expand All @@ -118,7 +118,7 @@ The search response includes an array of `sort` values for each hit:
"_source" : ...,
"sort" : [ <1>
1463538857,
"654323"
"654323"
]
}
]
Expand Down Expand Up @@ -150,7 +150,7 @@ GET twitter/_search
--------------------------------------------------
//TEST[continued]

Repeat this process by updating the `search_after` array every time you retrieve a
Repeat this process by updating the `search_after` array every time you retrieve a
new page of results. If a <<near-real-time,refresh>> occurs between these requests,
the order of your results may change, causing inconsistent results across pages. To
prevent this, you can create a <<point-in-time-api,point in time (PIT)>> to
Expand All @@ -167,10 +167,12 @@ The API returns a PIT ID.
[source,console-result]
----
{
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==",
"_shards": ...
}
----
// TESTRESPONSE[s/"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA=="/"id": $body.id/]
// TESTRESPONSE[s/"_shards": \.\.\./"_shards": "$body._shards"/]

To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_PROFILE_SLEEPS = def(8_725_00_0);
public static final TransportVersion ZDT_NANOS_SUPPORT = def(8_726_00_0);
public static final TransportVersion LTR_SERVERLESS_RELEASE = def(8_727_00_0);
public static final TransportVersion ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT = def(8_728_00_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults) : null;
final BytesReference searchContextId;
if (buildPointInTimeFromSearchResults()) {
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion);
searchContextId = SearchContextId.encode(queryResults.asList(), aliasFilter, minTransportVersion, failures);
} else {
if (request.source() != null
&& request.source().pointInTimeBuilder() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ public static void closeContexts(
final var successes = new AtomicInteger();
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
for (SearchContextIdForNode contextId : contextIds) {
if (contextId.getNode() == null) {
// the shard was missing when creating the PIT, ignore.
continue;
}
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class OpenPointInTimeRequest extends ActionRequest implements Indic

private QueryBuilder indexFilter;

private boolean allowPartialSearchResults = false;

public static final IndicesOptions DEFAULT_INDICES_OPTIONS = SearchRequest.DEFAULT_INDICES_OPTIONS;

public OpenPointInTimeRequest(String... indices) {
Expand All @@ -60,6 +62,9 @@ public OpenPointInTimeRequest(StreamInput in) throws IOException {
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
this.indexFilter = in.readOptionalNamedWriteable(QueryBuilder.class);
}
if (in.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) {
this.allowPartialSearchResults = in.readBoolean();
}
}

@Override
Expand All @@ -76,6 +81,11 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_12_0)) {
out.writeOptionalWriteable(indexFilter);
}
if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) {
out.writeBoolean(allowPartialSearchResults);
} else if (allowPartialSearchResults) {
throw new IOException("[allow_partial_search_results] is not supported on nodes with version " + out.getTransportVersion());
}
}

@Override
Expand Down Expand Up @@ -180,6 +190,15 @@ public boolean includeDataStreams() {
return true;
}

public boolean allowPartialSearchResults() {
return allowPartialSearchResults;
}

public OpenPointInTimeRequest allowPartialSearchResults(boolean allowPartialSearchResults) {
this.allowPartialSearchResults = allowPartialSearchResults;
return this;
}

@Override
public String getDescription() {
return "open search context: indices [" + String.join(",", indices) + "] keep_alive [" + keepAlive + "]";
Expand All @@ -200,6 +219,8 @@ public String toString() {
+ ", preference='"
+ preference
+ '\''
+ ", allowPartialSearchResults="
+ allowPartialSearchResults
+ '}';
}

Expand All @@ -218,12 +239,13 @@ public boolean equals(Object o) {
&& indicesOptions.equals(that.indicesOptions)
&& keepAlive.equals(that.keepAlive)
&& Objects.equals(routing, that.routing)
&& Objects.equals(preference, that.preference);
&& Objects.equals(preference, that.preference)
&& Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults);
}

@Override
public int hashCode() {
int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference);
int result = Objects.hash(indicesOptions, keepAlive, maxConcurrentShardRequests, routing, preference, allowPartialSearchResults);
result = 31 * result + Arrays.hashCode(indices);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.action.search;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -18,22 +19,46 @@
import java.util.Base64;
import java.util.Objects;

import static org.elasticsearch.rest.action.RestActions.buildBroadcastShardsHeader;

public final class OpenPointInTimeResponse extends ActionResponse implements ToXContentObject {
private final BytesReference pointInTimeId;

public OpenPointInTimeResponse(BytesReference pointInTimeId) {
private final int totalShards;
private final int successfulShards;
private final int failedShards;
private final int skippedShards;

public OpenPointInTimeResponse(
BytesReference pointInTimeId,
int totalShards,
int successfulShards,
int failedShards,
int skippedShards
) {
this.pointInTimeId = Objects.requireNonNull(pointInTimeId, "Point in time parameter must be not null");
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.failedShards = failedShards;
this.skippedShards = skippedShards;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBytesReference(pointInTimeId);
if (out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)) {
out.writeVInt(totalShards);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(skippedShards);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("id", Base64.getUrlEncoder().encodeToString(BytesReference.toBytes(pointInTimeId)));
buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, failedShards, skippedShards, null);
builder.endObject();
return builder;
}
Expand All @@ -42,4 +67,19 @@ public BytesReference getPointInTimeId() {
return pointInTimeId;
}

public int getTotalShards() {
return totalShards;
}

public int getSuccessfulShards() {
return successfulShards;
}

public int getFailedShards() {
return failedShards;
}

public int getSkippedShards() {
return skippedShards;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
openRequest.routing(request.param("routing"));
openRequest.preference(request.param("preference"));
openRequest.keepAlive(TimeValue.parseTimeValue(request.param("keep_alive"), null, "keep_alive"));
openRequest.allowPartialSearchResults(request.paramAsBoolean("allow_partial_search_results", false));
if (request.hasParam("max_concurrent_shard_requests")) {
final int maxConcurrentShardRequests = request.paramAsInt(
"max_concurrent_shard_requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.action.search;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
Expand Down Expand Up @@ -58,12 +59,30 @@ public boolean contains(ShardSearchContextId contextId) {
public static BytesReference encode(
List<SearchPhaseResult> searchPhaseResults,
Map<String, AliasFilter> aliasFilter,
TransportVersion version
TransportVersion version,
ShardSearchFailure[] shardFailures
) {
assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT)
: "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version ["
+ TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT
+ "] or higher.";
try (var out = new BytesStreamOutput()) {
out.setTransportVersion(version);
TransportVersion.writeVersion(version, out);
out.writeCollection(searchPhaseResults, SearchContextId::writeSearchPhaseResult);
boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.ALLOW_PARTIAL_SEARCH_RESULTS_IN_PIT);
int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0);
out.writeVInt(shardSize);
for (var searchResult : searchPhaseResults) {
final SearchShardTarget target = searchResult.getSearchShardTarget();
target.getShardId().writeTo(out);
new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out);
}
if (allowNullContextId) {
for (var failure : shardFailures) {
failure.shard().getShardId().writeTo(out);
new SearchContextIdForNode(failure.shard().getClusterAlias(), null, null).writeTo(out);
}
}
out.writeMap(aliasFilter, StreamOutput::writeWriteable);
return out.bytes();
} catch (IOException e) {
Expand All @@ -72,12 +91,6 @@ public static BytesReference encode(
}
}

private static void writeSearchPhaseResult(StreamOutput out, SearchPhaseResult searchPhaseResult) throws IOException {
final SearchShardTarget target = searchPhaseResult.getSearchShardTarget();
target.getShardId().writeTo(out);
new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchPhaseResult.getContextId()).writeTo(out);
}

public static SearchContextId decode(NamedWriteableRegistry namedWriteableRegistry, BytesReference id) {
try (var in = new NamedWriteableAwareStreamInput(id.streamInput(), namedWriteableRegistry)) {
final TransportVersion version = TransportVersion.readVersion(in);
Expand Down
Loading

0 comments on commit 785fe53

Please sign in to comment.