Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add automatic tiebreaker for search requests that use a PIT #68833

Merged
merged 11 commits into from
Feb 17, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,13 @@ To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
the target data stream or index from the request path.

IMPORTANT: We recommend you include a tiebreaker field in your `sort`. This
tiebreaker field should contain a unique value for each document. If you don't
include a tiebreaker field, your paged results could miss or duplicate hits.
IMPORTANT: If you cannot use a PIT, we recommend that you include a tiebreaker field
jimczi marked this conversation as resolved.
Show resolved Hide resolved
in your `sort`. This tiebreaker field should contain a unique value for each document.
If you don't include a tiebreaker field, your paged results could miss or duplicate hits.

NOTE: Search after requests have optimizations that make them faster when the sort
order is `_doc` and total hits are not tracked. If you want to iterate over all documents regardless of the
order, this is the most efficient option.

[source,console]
----
Expand All @@ -90,18 +94,47 @@ GET /_search
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": "asc"},
{"tie_breaker_id": "asc"}
{"@timestamp": "asc"}
]
}
----
// TEST[catch:missing]

<1> PIT ID for the search.
<2> Sorts hits for the search.
<2> Sorts hits for the search with an implicit tiebreak on `_shard_doc` ascending.

The search response includes an array of `sort` values for each hit. If you used
a PIT, the response's `pit_id` parameter contains an updated PIT ID.
a PIT, a tiebreakeris included as the last `sort` values for each hit.
jimczi marked this conversation as resolved.
Show resolved Hide resolved
This tiebreaker called `_shard_doc` is added automically on every search requests that use a PIT.
The `_shard_doc` value is the combination of the shard index within the PIT and the Lucene's internal doc ID,
it is unique per document and constant within a PIT.
You can also add the tiebreaker explicitely in the search request to customize the order:
jimczi marked this conversation as resolved.
Show resolved Hide resolved

[source,console]
----
GET /_search
{
"size": 10000,
"query": {
"match" : {
"user.id" : "elkbee"
}
},
"pit": {
"id": "46ToAwMDaWR5BXV1aWQyKwZub2RlXzMAAAAAAAAAACoBYwADaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQADaWR5BXV1aWQyKgZub2RlXzIAAAAAAAAAAAwBYgACBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <1>
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": "asc"}
{"_shard_doc": "desc"}
]
}
----
// TEST[catch:missing]

<1> PIT ID for the search.
<2> Sorts hits for the search with an explicit tiebreak on `_shard_doc` descending.


[source,console-result]
----
Expand All @@ -122,7 +155,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.
"_source" : ...,
"sort" : [ <2>
4098435132000,
"FaslK3QBySSL_rrj9zM5"
4294967298 <3>
]
}
]
Expand All @@ -133,9 +166,10 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.

<1> Updated `id` for the point in time.
<2> Sort values for the last returned hit.
<3> The tiebreaker value, unique per document within the `pit_id`.

To get the next page of results, rerun the previous search using the last hit's
sort values as the `search_after` argument. If using a PIT, use the latest PIT
sort values (including the tiebreaker) as the `search_after` argument. If using a PIT, use the latest PIT
ID in the `pit.id` parameter. The search's `query` and `sort` arguments must
remain unchanged. If provided, the `from` argument must be `0` (default) or `-1`.

Expand All @@ -154,19 +188,20 @@ GET /_search
"keep_alive": "1m"
},
"sort": [
{"@timestamp": "asc"},
{"tie_breaker_id": "asc"}
{"@timestamp": "asc"}
],
"search_after": [ <2>
4098435132000,
"FaslK3QBySSL_rrj9zM5"
]
4294967298
],
"track_total_hits": false
}
----
// TEST[catch:missing]

<1> PIT ID returned by the previous search.
<2> Sort values from the previous search's last hit.
<3> Disable the tracking of total hits to speed up pagination.

You can repeat this process to get additional pages of results. If using a PIT,
you can extend the PIT's retention period using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.index.query.QueryRewriteContext;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.ShardDocSortField;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand All @@ -48,7 +51,7 @@
* @see org.elasticsearch.client.Client#search(SearchRequest)
* @see SearchResponse
*/
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable {
public class SearchRequest extends ActionRequest implements IndicesRequest.Replaceable, Rewriteable<SearchRequest> {

public static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

Expand Down Expand Up @@ -641,6 +644,31 @@ public int resolveTrackTotalHitsUpTo() {
return resolveTrackTotalHitsUpTo(scroll, source);
}

@Override
public SearchRequest rewrite(QueryRewriteContext ctx) throws IOException {
if (source == null) {
return this;
}

SearchSourceBuilder source = this.source.rewrite(ctx);
boolean hasChanged = source != this.source;

// add a sort tiebreaker for PIT search requests if not explicitly set
if (source.pointInTimeBuilder() != null) {
if (source.sorts() == null || source.sorts().isEmpty()) {
source.sort(SortBuilders.scoreSort());
}
mayya-sharipova marked this conversation as resolved.
Show resolved Hide resolved
SortBuilder<?> lastSort = source.sorts().get(source.sorts().size() - 1);
if (lastSort instanceof FieldSortBuilder == false
|| FieldSortBuilder.SHARD_DOC_FIELD_NAME.equals(((FieldSortBuilder) lastSort).getFieldName()) == false) {
source.sort(SortBuilders.fieldSort(FieldSortBuilder.SHARD_DOC_FIELD_NAME).unmappedType("long"));
hasChanged = true;
}
}

return hasChanged ? new SearchRequest(this).source(source) : this;
}

public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder source) {
if (scroll != null) {
// no matter what the value of track_total_hits is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,42 +259,39 @@ boolean buildPointInTimeFromSearchResults() {
}, listener);
}

private void executeRequest(Task task, SearchRequest searchRequest,
SearchAsyncActionProvider searchAsyncActionProvider, ActionListener<SearchResponse> listener) {
private void executeRequest(Task task,
SearchRequest original,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener) {
final long relativeStartNanos = System.nanoTime();
final SearchTimeProvider timeProvider =
new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
searchRequest.source(source);
}
new SearchTimeProvider(original.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime);
ActionListener<SearchRequest> rewriteListener = ActionListener.wrap(rewritten -> {
final SearchContextId searchContext;
final Map<String, OriginalIndices> remoteClusterIndices;
if (searchRequest.pointInTimeBuilder() != null) {
searchContext = searchRequest.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions());
if (rewritten.pointInTimeBuilder() != null) {
searchContext = rewritten.pointInTimeBuilder().getSearchContextId(namedWriteableRegistry);
remoteClusterIndices = getIndicesFromSearchContexts(searchContext, rewritten.indicesOptions());
} else {
searchContext = null;
remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices());
remoteClusterIndices = remoteClusterService.groupIndices(rewritten.indicesOptions(), rewritten.indices());
}
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final ClusterState clusterState = clusterService.state();
if (remoteClusterIndices.isEmpty()) {
executeLocalSearch(
task, timeProvider, searchRequest, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
task, timeProvider, rewritten, localIndices, clusterState, listener, searchContext, searchAsyncActionProvider);
} else {
if (shouldMinimizeRoundtrips(searchRequest)) {
if (shouldMinimizeRoundtrips(rewritten)) {
final TaskId parentTaskId = task.taskInfo(clusterService.localNode().getId(), false).getTaskId();
ccsRemoteReduce(parentTaskId, searchRequest, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
ccsRemoteReduce(parentTaskId, rewritten, localIndices, remoteClusterIndices, timeProvider,
searchService.aggReduceContextBuilder(rewritten),
remoteClusterService, threadPool, listener,
(r, l) -> executeLocalSearch(
task, timeProvider, r, localIndices, clusterState, l, searchContext, searchAsyncActionProvider));
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(),
collectSearchShards(rewritten.indicesOptions(), rewritten.preference(), rewritten.routing(),
skippedClusters, remoteClusterIndices, remoteClusterService, threadPool,
ActionListener.wrap(
searchShardsResponses -> {
Expand All @@ -305,7 +302,7 @@ private void executeRequest(Task task, SearchRequest searchRequest,
if (searchContext != null) {
remoteAliasFilters = searchContext.aliasFilter();
remoteShardIterators = getRemoteShardsIteratorFromPointInTime(searchShardsResponses,
searchContext, searchRequest.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
searchContext, rewritten.pointInTimeBuilder().getKeepAlive(), remoteClusterIndices);
} else {
remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses);
remoteShardIterators = getRemoteShardsIterator(searchShardsResponses, remoteClusterIndices,
Expand All @@ -314,7 +311,7 @@ private void executeRequest(Task task, SearchRequest searchRequest,
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteClusterIndices.size() + localClusters;
int successfulClusters = searchShardsResponses.size() + localClusters;
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators,
executeSearch((SearchTask) task, timeProvider, rewritten, localIndices, remoteShardIterators,
clusterNodeLookup, clusterState, remoteAliasFilters, listener,
new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()),
searchContext, searchAsyncActionProvider);
Expand All @@ -323,12 +320,8 @@ private void executeRequest(Task task, SearchRequest searchRequest,
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}
Rewriteable.rewriteAndFetch(original, searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener);
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,12 @@ public void testPITTiebreak() throws Exception {
try {
for (int size = 1; size <= numIndex; size++) {
SortOrder order = randomBoolean() ? SortOrder.ASC : SortOrder.DESC;
assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size);
assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.pitTiebreaker().order(order));

assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.fieldSort("value"));
assertPagination(new PointInTimeBuilder(pit), expectedNumDocs, size,
SortBuilders.fieldSort("value"), SortBuilders.pitTiebreaker().order(order));
}
Expand All @@ -406,18 +410,21 @@ public void testPITTiebreak() throws Exception {
}
}

private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sort) throws Exception {
private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int size, SortBuilder<?>... sorts) throws Exception {
Set<String> seen = new HashSet<>();
SearchRequestBuilder builder = client().prepareSearch()
.setSize(size)
.setPointInTime(pit);
for (SortBuilder<?> sort : sorts) {
builder.addSort(sort);
}
final SearchRequest searchRequest = builder.request().rewrite(null);

final int[] reverseMuls = new int[sort.length];
for (int i = 0; i < sort.length; i++) {
builder.addSort(sort[i]);
reverseMuls[i] = sort[i].order() == SortOrder.ASC ? 1 : -1;
final List<SortBuilder<?>> expectedSorts = searchRequest.source().sorts();
final int[] reverseMuls = new int[expectedSorts.size()];
for (int i = 0; i < expectedSorts.size(); i++) {
reverseMuls[i] = expectedSorts.get(i).order() == SortOrder.ASC ? 1 : -1;
}
final SearchRequest searchRequest = builder.request();
SearchResponse response = client().search(searchRequest).get();
Object[] lastSortValues = null;
while (response.getHits().getHits().length > 0) {
Expand All @@ -426,7 +433,7 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
assertTrue(seen.add(hit.getIndex() + hit.getId()));

if (lastHitSortValues != null) {
for (int i = 0; i < sort.length; i++) {
for (int i = 0; i < expectedSorts.size(); i++) {
Comparable value = (Comparable) hit.getRawSortValues()[i];
int cmp = value.compareTo(lastHitSortValues[i]) * reverseMuls[i];
if (cmp != 0) {
Expand All @@ -440,7 +447,7 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
int len = response.getHits().getHits().length;
SearchHit last = response.getHits().getHits()[len - 1];
if (lastSortValues != null) {
for (int i = 0; i < sort.length; i++) {
for (int i = 0; i < expectedSorts.size(); i++) {
Comparable value = (Comparable) last.getSortValues()[i];
int cmp = value.compareTo(lastSortValues[i]) * reverseMuls[i];
if (cmp != 0) {
Expand All @@ -449,7 +456,7 @@ private void assertPagination(PointInTimeBuilder pit, int expectedNumDocs, int s
}
}
}
assertThat(last.getSortValues().length, equalTo(sort.length));
assertThat(last.getSortValues().length, equalTo(expectedSorts.size()));
lastSortValues = last.getSortValues();
searchRequest.source().searchAfter(last.getSortValues());
response = client().search(searchRequest).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,8 @@
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: "/\\.ds-simple-data-stream1-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: {hits.hits.0._id: "123" }
- match: {hits.hits.0.sort: [22, 123] }
- match: {hits.hits.0.sort.0: 22}
- match: {hits.hits.0.sort.1: 123}

- do:
search:
Expand All @@ -693,7 +694,8 @@
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: "/\\.ds-simple-data-stream1-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: {hits.hits.0._id: "5" }
- match: {hits.hits.0.sort: [18, 5] }
- match: {hits.hits.0.sort.0: 18}
- match: {hits.hits.0.sort.1: 5}

- do:
search:
Expand All @@ -712,7 +714,8 @@
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: "/\\.ds-simple-data-stream1-(\\d{4}\\.\\d{2}\\.\\d{2}-)?000001/" }
- match: {hits.hits.0._id: "1" }
- match: {hits.hits.0.sort: [18, 1] }
- match: {hits.hits.0.sort.0: 18}
- match: {hits.hits.0.sort.1: 1}

- do:
search:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ setup:
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: test }
- match: {hits.hits.0._id: "172" }
- match: {hits.hits.0.sort: [24, 172] }
- match: {hits.hits.0.sort.0: 24}
- match: {hits.hits.0.sort.1: 172}

- do:
index:
Expand Down Expand Up @@ -89,7 +90,8 @@ setup:
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: test }
- match: {hits.hits.0._id: "42" }
- match: {hits.hits.0.sort: [18, 42] }
- match: {hits.hits.0.sort.0: 18}
- match: {hits.hits.0.sort.1: 42}

- do:
search:
Expand All @@ -107,7 +109,8 @@ setup:
- length: {hits.hits: 1 }
- match: {hits.hits.0._index: test }
- match: {hits.hits.0._id: "1" }
- match: {hits.hits.0.sort: [18, 1] }
- match: {hits.hits.0.sort.0: 18}
- match: {hits.hits.0.sort.1: 1}

- do:
search:
Expand Down