Skip to content

Commit

Permalink
Filter original indices in shard level request (#78508)
Browse files Browse the repository at this point in the history
Today the search action send the full list of original indices on every shard request.
This change restricts the list to the concrete index of the shard or the alias that was used to resolve it.

Relates #78314
  • Loading branch information
jimczi authored Oct 14, 2021
1 parent 05b52de commit 5416582
Show file tree
Hide file tree
Showing 62 changed files with 499 additions and 255 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.QueryPhaseResultConsumer;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchProgressListener;
Expand Down Expand Up @@ -183,9 +182,7 @@ public SearchPhaseController.ReducedQueryPhase reduceAggs(TermsList candidateLis
new DocValueFormat[] { DocValueFormat.RAW }
);
result.aggregations(candidateList.get(i));
result.setSearchShardTarget(
new SearchShardTarget("node", new ShardId(new Index("index", "index"), i), null, OriginalIndices.NONE)
);
result.setSearchShardTarget(new SearchShardTarget("node", new ShardId(new Index("index", "index"), i), null));
shards.add(result);
}
SearchRequest request = new SearchRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private static ShardSearchFailure createShardFailureTestItem() {
String nodeId = randomAlphaOfLengthBetween(5, 10);
String indexName = randomAlphaOfLengthBetween(5, 10);
searchShardTarget = new SearchShardTarget(nodeId,
new ShardId(new Index(indexName, IndexMetadata.INDEX_UUID_NA_VALUE), randomInt()), null, null);
new ShardId(new Index(indexName, IndexMetadata.INDEX_UUID_NA_VALUE), randomInt()), null);
}
return new ShardSearchFailure(ex, searchShardTarget);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.rankeval;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -65,7 +64,7 @@ public void testDCGAt() {
rated.add(new RatedDocument("index", Integer.toString(i), relevanceRatings[i]));
hits[i] = new SearchHit(i, Integer.toString(i), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));
}
DiscountedCumulativeGain dcg = new DiscountedCumulativeGain();
assertEquals(EXPECTED_DCG, dcg.evaluate("id", hits, rated).metricScore(), DELTA);
Expand Down Expand Up @@ -116,7 +115,7 @@ public void testDCGAtSixMissingRatings() {
}
hits[i] = new SearchHit(i, Integer.toString(i), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));
}
DiscountedCumulativeGain dcg = new DiscountedCumulativeGain();
EvalQueryQuality result = dcg.evaluate("id", hits, rated);
Expand Down Expand Up @@ -174,7 +173,7 @@ public void testDCGAtFourMoreRatings() {
for (int i = 0; i < 4; i++) {
hits[i] = new SearchHit(i, Integer.toString(i), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));
}
DiscountedCumulativeGain dcg = new DiscountedCumulativeGain();
EvalQueryQuality result = dcg.evaluate("id", hits, ratedDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.rankeval;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -46,7 +45,7 @@ public static EvalQueryQuality randomEvalQueryQuality() {
for (int i = 0; i < numberOfSearchHits; i++) {
RatedSearchHit ratedSearchHit = RatedSearchHitTests.randomRatedSearchHit();
// we need to associate each hit with an index name otherwise rendering will not work
ratedSearchHit.getSearchHit().shard(new SearchShardTarget("_na_", new ShardId("index", "_na_", 0), null, OriginalIndices.NONE));
ratedSearchHit.getSearchHit().shard(new SearchShardTarget("_na_", new ShardId("index", "_na_", 0), null));
ratedHits.add(ratedSearchHit);
}
EvalQueryQuality evalQueryQuality = new EvalQueryQuality(randomAlphaOfLength(10),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.rankeval;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -108,7 +107,7 @@ private SearchHit[] createSearchHits(List<RatedDocument> rated, Integer[] releva
}
hits[i] = new SearchHit(i, Integer.toString(i), new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));
}
return hits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.rankeval;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -194,7 +193,7 @@ private static SearchHit[] createSearchHits(int from, int to, String index) {
SearchHit[] hits = new SearchHit[to + 1 - from];
for (int i = from; i <= to; i++) {
hits[i] = new SearchHit(i, i + "", new Text(""), Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null));
}
return hits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.rankeval;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -104,7 +103,7 @@ public void testIgnoreUnlabeled() {
// add an unlabeled search hit
SearchHit[] searchHits = Arrays.copyOf(toSearchHits(rated, "test"), 3);
searchHits[2] = new SearchHit(2, "2", new Text(MapperService.SINGLE_MAPPING_NAME), Collections.emptyMap(), Collections.emptyMap());
searchHits[2].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
searchHits[2].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));

EvalQueryQuality evaluated = (new PrecisionAtK()).evaluate("id", searchHits, rated);
assertEquals((double) 2 / 3, evaluated.metricScore(), 0.00001);
Expand All @@ -124,7 +123,7 @@ public void testNoRatedDocs() throws Exception {
for (int i = 0; i < 5; i++) {
hits[i] = new SearchHit(i, i + "", new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));
}
EvalQueryQuality evaluated = (new PrecisionAtK()).evaluate("id", hits, Collections.emptyList());
assertEquals(0.0d, evaluated.metricScore(), 0.00001);
Expand Down Expand Up @@ -246,7 +245,7 @@ private static SearchHit[] toSearchHits(List<RatedDocument> rated, String index)
SearchHit[] hits = new SearchHit[rated.size()];
for (int i = 0; i < rated.size(); i++) {
hits[i] = new SearchHit(i, i + "", new Text(""), Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null));
}
return hits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.index.rankeval;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class RankEvalResponseTests extends ESTestCase {
new IllegalArgumentException("Closed resource", new RuntimeException("Resource")),
new SearchPhaseExecutionException("search", "all shards failed",
new ShardSearchFailure[] { new ShardSearchFailure(new ParsingException(1, 2, "foobar", null),
new SearchShardTarget("node_1", new ShardId("foo", "_na_", 1), null, OriginalIndices.NONE)) }),
new SearchShardTarget("node_1", new ShardId("foo", "_na_", 1), null)) }),
new ElasticsearchException("Parsing failed",
new ParsingException(9, 42, "Wrong state", new NullPointerException("Unexpected null value"))) };

Expand Down Expand Up @@ -172,7 +171,7 @@ public void testToXContent() throws IOException {
private static RatedSearchHit searchHit(String index, int docId, Integer rating) {
SearchHit hit = new SearchHit(docId, docId + "", new Text(MapperService.SINGLE_MAPPING_NAME),
Collections.emptyMap(), Collections.emptyMap());
hit.shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null, OriginalIndices.NONE));
hit.shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null));
hit.score(1.0f);
return new RatedSearchHit(hit, rating != null ? OptionalInt.of(rating) : OptionalInt.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.elasticsearch.index.rankeval;

import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -105,7 +104,7 @@ public void testNoRatedDocs() throws Exception {
SearchHit[] hits = new SearchHit[k];
for (int i = 0; i < k; i++) {
hits[i] = new SearchHit(i, i + "", new Text(""), Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId("index", "uuid", 0), null));
}

EvalQueryQuality evaluated = (new RecallAtK()).evaluate("id", hits, Collections.emptyList());
Expand Down Expand Up @@ -227,7 +226,7 @@ private static SearchHit[] toSearchHits(List<RatedDocument> rated, String index)
SearchHit[] hits = new SearchHit[rated.size()];
for (int i = 0; i < rated.size(); i++) {
hits[i] = new SearchHit(i, i + "", new Text(""), Collections.emptyMap(), Collections.emptyMap());
hits[i].shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null, OriginalIndices.NONE));
hits[i].shard(new SearchShardTarget("testnode", new ShardId(index, "uuid", 0), null));
}
return hits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,10 @@ public void testSearchQueryThenFetch() throws Exception {
assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));

clearInterceptedActions();
assertSameIndices(searchRequest, SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME);
}

public void testSearchDfsQueryThenFetch() throws Exception {
Expand All @@ -577,10 +578,10 @@ public void testSearchDfsQueryThenFetch() throws Exception {
assertThat(searchResponse.getHits().getTotalHits().value, greaterThan(0L));

clearInterceptedActions();
assertSameIndices(searchRequest, SearchTransportService.DFS_ACTION_NAME, SearchTransportService.QUERY_ID_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.DFS_ACTION_NAME,
SearchTransportService.QUERY_ID_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME);
//free context messages are not necessarily sent, but if they are, check their indices
assertSameIndicesOptionalRequests(searchRequest, SearchTransportService.FREE_CONTEXT_ACTION_NAME);
assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME);
}

private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {
Expand All @@ -604,11 +605,22 @@ private static void assertSameIndices(IndicesRequest originalRequest, boolean op
}
}
}

private static void assertIndicesSubset(List<String> indices, String... actions) {
assertIndicesSubset(indices, false, actions);
}

private static void assertIndicesSubsetOptionalRequests(List<String> indices, String... actions) {
assertIndicesSubset(indices, true, actions);
}

private static void assertIndicesSubset(List<String> indices, boolean optional, String... actions) {
//indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
List<TransportRequest> requests = consumeTransportRequests(action);
assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0));
if (optional == false) {
assertThat("no internal requests intercepted for action [" + action + "]", requests.size(), greaterThan(0));
}
for (TransportRequest internalRequest : requests) {
IndicesRequest indicesRequest = convertRequest(internalRequest);
for (String index : indicesRequest.indices()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ protected void masterOperation(final ClusterSearchShardsRequest request, final C
Set<String> indicesAndAliases = indexNameExpressionResolver.resolveExpressions(clusterState, request.indices());
for (String index : concreteIndices) {
final AliasFilter aliasFilter = indicesService.buildAliasFilter(clusterState, index, indicesAndAliases);
final String[] aliases = indexNameExpressionResolver.indexAliases(clusterState, index, aliasMetadata -> true, true,
indicesAndAliases);
final String[] aliases = indexNameExpressionResolver.indexAliases(clusterState, index,
aliasMetadata -> true, dataStreamAlias -> true, true, indicesAndAliases);
indicesAndFilters.put(index, new AliasFilter(aliasFilter.getQueryBuilder(), aliases));
}

Expand Down
Loading

0 comments on commit 5416582

Please sign in to comment.