Skip to content

Commit

Permalink
Don't throw error for remote shards that open PIT filtered out (elast…
Browse files Browse the repository at this point in the history
…ic#104288)

We recently introduced support for index_filter to the open point in time API (see elastic#102388).
Open point in time supports executing against remote indices, in which case it will open a
reader context against the target remote shards. With support for index_filter, shards that
cannot match the filter are not even included in the PIT id that open PIT returns.

When the following search is executed that includes such PIT id, there is one search shards call
per cluster performed, which will return all shards from the targeted indices, including those
that open PIT has filtered out. In that case, we should just ignore those shards instead of
throwing exception when those are looked up in the search context id map built from the PIT id.

Closes elastic#102596
  • Loading branch information
javanna committed Jan 15, 2024
1 parent 653086e commit 52d5938
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 20 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/104288.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 104288
summary: Don't throw error for remote shards that open PIT filtered out
area: Search
type: bug
issues:
- 102596
3 changes: 1 addition & 2 deletions qa/ccs-common-rest/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ tasks.named("yamlRestTest") {
'search.aggregation/220_filters_bucket/cache hits', // node_selector?
'search.aggregation/50_filter/Standard queries get cached',
'search.aggregation/50_filter/Terms lookup gets cached', // terms lookup by "index" doesn't seem to work correctly
'search.aggregation/70_adjacency_matrix/Terms lookup', // terms lookup by "index" doesn't seem to work correctly
'search/350_point_in_time/point-in-time with index filter'
'search.aggregation/70_adjacency_matrix/Terms lookup' // terms lookup by "index" doesn't seem to work correctly
].join(',')
}

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ setup:
- do:
indices.create:
index: test2
body:
settings:
index:
number_of_shards: 2

- do:
index:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.search.builder.PointInTimeBuilder;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.search.query.ThrowingQueryBuilder;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.RemoteClusterAware;
import org.hamcrest.MatcherAssert;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -74,11 +76,15 @@ public void testBasic() {
final Client localClient = client(LOCAL_CLUSTER);
final Client remoteClient = client(REMOTE_CLUSTER);
int localNumDocs = randomIntBetween(10, 50);
assertAcked(localClient.admin().indices().prepareCreate("local_test"));
assertAcked(
localClient.admin().indices().prepareCreate("local_test").setSettings(Settings.builder().put("index.number_of_shards", 3))
);
indexDocs(localClient, "local_test", localNumDocs);

int remoteNumDocs = randomIntBetween(10, 50);
assertAcked(remoteClient.admin().indices().prepareCreate("remote_test"));
assertAcked(
remoteClient.admin().indices().prepareCreate("remote_test").setSettings(Settings.builder().put("index.number_of_shards", 3))
);
indexDocs(remoteClient, "remote_test", remoteNumDocs);
boolean includeLocalIndex = randomBoolean();
List<String> indices = new ArrayList<>();
Expand Down Expand Up @@ -107,19 +113,120 @@ public void testBasic() {

SearchResponse.Clusters clusters = resp.getClusters();
int expectedNumClusters = 1 + (includeLocalIndex ? 1 : 0);
assertThat(clusters.getTotal(), equalTo(expectedNumClusters));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL), equalTo(expectedNumClusters));
assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));
MatcherAssert.assertThat(clusters.getTotal(), equalTo(expectedNumClusters));
MatcherAssert.assertThat(
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
equalTo(expectedNumClusters)
);
MatcherAssert.assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));

if (includeLocalIndex) {
SearchResponse.Cluster localCluster = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
assertNotNull(localCluster);
assertOneSuccessfulShard(localCluster);
assertAllSuccessfulShards(localCluster, 3, 0);
}

SearchResponse.Cluster remoteCluster = clusters.getCluster(REMOTE_CLUSTER);
assertNotNull(remoteCluster);
assertOneSuccessfulShard(remoteCluster);
assertAllSuccessfulShards(remoteCluster, 3, 0);
}
);
} finally {
closePointInTime(pitId);
}
}

public void testOpenPITWithIndexFilter() {
final Client localClient = client(LOCAL_CLUSTER);
final Client remoteClient = client(REMOTE_CLUSTER);

assertAcked(
localClient.admin().indices().prepareCreate("local_test").setSettings(Settings.builder().put("index.number_of_shards", 3))
);
localClient.prepareIndex("local_test").setId("1").setSource("value", "1", "@timestamp", "2024-03-01").get();
localClient.prepareIndex("local_test").setId("2").setSource("value", "2", "@timestamp", "2023-12-01").get();
localClient.admin().indices().prepareRefresh("local_test").get();

assertAcked(
remoteClient.admin().indices().prepareCreate("remote_test").setSettings(Settings.builder().put("index.number_of_shards", 3))
);
remoteClient.prepareIndex("remote_test").setId("1").setSource("value", "1", "@timestamp", "2024-01-01").get();
remoteClient.prepareIndex("remote_test").setId("2").setSource("value", "2", "@timestamp", "2023-12-01").get();
remoteClient.admin().indices().prepareRefresh("remote_test").get();

List<String> indices = new ArrayList<>();
indices.add(randomFrom("*", "local_*", "local_test"));
indices.add(randomFrom("*:*", "remote_cluster:*", "remote_cluster:remote_test"));

OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices.toArray(new String[0]));
request.keepAlive(TimeValue.timeValueMinutes(2));
request.indexFilter(new RangeQueryBuilder("@timestamp").gte("2023-12-15"));
final OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
String pitId = response.getPointInTimeId();

if (randomBoolean()) {
localClient.prepareIndex("local_test").setId("local_new").setSource().get();
localClient.admin().indices().prepareRefresh().get();
}
if (randomBoolean()) {
remoteClient.prepareIndex("remote_test").setId("remote_new").setSource().get();
remoteClient.admin().indices().prepareRefresh().get();
}

try {
assertNoFailuresAndResponse(
localClient.prepareSearch()
.setPreference(null)
.setQuery(new MatchAllQueryBuilder())
.setPointInTime(new PointInTimeBuilder(pitId)),
resp -> {
assertHitCount(resp, 2);

SearchResponse.Clusters clusters = resp.getClusters();
int expectedNumClusters = 2;
MatcherAssert.assertThat(clusters.getTotal(), equalTo(expectedNumClusters));
MatcherAssert.assertThat(
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
equalTo(expectedNumClusters)
);
MatcherAssert.assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));

// both indices (local and remote) have shards, but there is a single shard left after can match
SearchResponse.Cluster localCluster = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
assertNotNull(localCluster);
assertAllSuccessfulShards(localCluster, 1, 0);
SearchResponse.Cluster remoteCluster = clusters.getCluster(REMOTE_CLUSTER);
assertNotNull(remoteCluster);
assertAllSuccessfulShards(remoteCluster, 1, 0);
}
);

assertNoFailuresAndResponse(
localClient.prepareSearch()
.setPreference(null)
// test the scenario where search also runs can match and filters additional shards out
.setPreFilterShardSize(1)
.setQuery(new RangeQueryBuilder("@timestamp").gte("2024-02-01"))
.setPointInTime(new PointInTimeBuilder(pitId)),
resp -> {
assertHitCount(resp, 1);

SearchResponse.Clusters clusters = resp.getClusters();
int expectedNumClusters = 2;
MatcherAssert.assertThat(clusters.getTotal(), equalTo(expectedNumClusters));
MatcherAssert.assertThat(
clusters.getClusterStateCount(SearchResponse.Cluster.Status.SUCCESSFUL),
equalTo(expectedNumClusters)
);
MatcherAssert.assertThat(clusters.getClusterStateCount(SearchResponse.Cluster.Status.SKIPPED), equalTo(0));

// both indices (local and remote) have shards, but there is a single shard left after can match
SearchResponse.Cluster localCluster = clusters.getCluster(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
assertNotNull(localCluster);
assertAllSuccessfulShards(localCluster, 1, 0);
SearchResponse.Cluster remoteCluster = clusters.getCluster(REMOTE_CLUSTER);
assertNotNull(remoteCluster);
assertAllSuccessfulShards(remoteCluster, 1, 1);
}
);
} finally {
Expand Down Expand Up @@ -180,16 +287,6 @@ public void testFailuresOnOneShardsWithPointInTime() throws ExecutionException,
}
}

private static void assertOneSuccessfulShard(SearchResponse.Cluster cluster) {
assertThat(cluster.getTotalShards(), equalTo(1));
assertThat(cluster.getSuccessfulShards(), equalTo(1));
assertThat(cluster.getFailedShards(), equalTo(0));
assertThat(cluster.getFailures().size(), equalTo(0));
assertThat(cluster.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertFalse(cluster.isTimedOut());
}

private static void assertOneFailedShard(SearchResponse.Cluster cluster, int totalShards) {
assertThat(cluster.getSuccessfulShards(), equalTo(totalShards - 1));
assertThat(cluster.getFailedShards(), equalTo(1));
Expand All @@ -200,6 +297,17 @@ private static void assertOneFailedShard(SearchResponse.Cluster cluster, int tot
assertFalse(cluster.isTimedOut());
}

private static void assertAllSuccessfulShards(SearchResponse.Cluster cluster, int numShards, int skippedShards) {
assertThat(cluster.getTotalShards(), equalTo(numShards));
assertThat(cluster.getSkippedShards(), equalTo(skippedShards));
assertThat(cluster.getSuccessfulShards(), equalTo(numShards));
assertThat(cluster.getFailedShards(), equalTo(0));
assertThat(cluster.getFailures().size(), equalTo(0));
assertThat(cluster.getStatus(), equalTo(SearchResponse.Cluster.Status.SUCCESSFUL));
assertThat(cluster.getTook().millis(), greaterThanOrEqualTo(0L));
assertFalse(cluster.isTimedOut());
}

private String openPointInTime(String[] indices, TimeValue keepAlive) {
OpenPointInTimeRequest request = new OpenPointInTimeRequest(indices).keepAlive(keepAlive);
final OpenPointInTimeResponse response = client().execute(TransportOpenPointInTimeAction.TYPE, request).actionGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ public SearchPhase newSearchPhase(
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
// Note: remote shards are prefiltered via can match as part of search shards. They don't need additional pre-filtering and
// that is signaled to the local can match through the SearchShardIterator#prefiltered flag. Local shards do need to go
// through the local can match phase.
if (SearchService.canRewriteToMatchNone(searchRequest.source())) {
return new CanMatchPreFilterSearchPhase(
logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -985,8 +985,12 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(
for (Map.Entry<String, SearchShardsResponse> entry : searchShardsResponses.entrySet()) {
for (SearchShardsGroup group : entry.getValue().getGroups()) {
final ShardId shardId = group.shardId();
final String clusterAlias = entry.getKey();
final SearchContextIdForNode perNode = searchContextId.shards().get(shardId);
if (perNode == null) {
// the shard was skipped after can match, hence it is not even part of the pit id
continue;
}
final String clusterAlias = entry.getKey();
assert clusterAlias.equals(perNode.getClusterAlias()) : clusterAlias + " != " + perNode.getClusterAlias();
final List<String> targetNodes = new ArrayList<>(group.allocatedNodes().size());
targetNodes.add(perNode.getNode());
Expand Down Expand Up @@ -1015,9 +1019,26 @@ static List<SearchShardIterator> getRemoteShardsIteratorFromPointInTime(
remoteShardIterators.add(shardIterator);
}
}
assert checkAllRemotePITShardsWereReturnedBySearchShards(searchContextId.shards(), searchShardsResponses)
: "search shards did not return remote shards that PIT included: " + searchContextId.shards();
return remoteShardIterators;
}

private static boolean checkAllRemotePITShardsWereReturnedBySearchShards(
Map<ShardId, SearchContextIdForNode> searchContextIdShards,
Map<String, SearchShardsResponse> searchShardsResponses
) {
Map<ShardId, SearchContextIdForNode> searchContextIdForNodeMap = new HashMap<>(searchContextIdShards);
for (SearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
for (SearchShardsGroup group : searchShardsResponse.getGroups()) {
searchContextIdForNodeMap.remove(group.shardId());
}
}
return searchContextIdForNodeMap.values()
.stream()
.allMatch(searchContextIdForNode -> searchContextIdForNode.getClusterAlias() == null);
}

Index[] resolveLocalIndices(OriginalIndices localIndices, ClusterState clusterState, SearchTimeProvider timeProvider) {
if (localIndices == null) {
return Index.EMPTY_ARRAY; // don't search on any local index (happens when only remote indices were specified)
Expand Down

0 comments on commit 52d5938

Please sign in to comment.