Skip to content

Commit

Permalink
Adjust BWC for node-level field cap requests (#79301)
Browse files Browse the repository at this point in the history
Relates #79212
  • Loading branch information
dnhatn authored Oct 17, 2021
1 parent e86de06 commit 700d01e
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 426 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ tasks.register("verifyVersions") {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = false
boolean bwc_tests_enabled = true
// place a PR link here when committing bwc changes:
String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/79301"
String bwc_tests_disabled_issue = ""
/*
* FIPS 140-2 behavior was fixed in 7.11.0. Before that there is no way to run elasticsearch in a
* JVM that is properly configured to be in fips mode with BCFIPS. For now we need to disable
Expand Down
1 change: 0 additions & 1 deletion rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ tasks.named("yamlRestTestV7CompatTransform").configure { task ->

task.replaceValueInMatch("_type", "_doc")
task.addAllowedWarningRegex("\\[types removal\\].*")
task.addAllowedWarning("[transient settings removal] Updating cluster settings through transientSettings is deprecated. Use persistent settings instead.")
task.replaceValueInMatch("nodes.\$node_id.roles.8", "ml", "node_info role test")
task.replaceValueInMatch("nodes.\$node_id.roles.9", "remote_cluster_client", "node_info role test")
task.removeMatch("nodes.\$node_id.roles.10", "node_info role test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.index.mapper.ObjectMapper;
import org.elasticsearch.index.mapper.RuntimeField;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -42,21 +43,21 @@ class FieldCapabilitiesFetcher {
this.indicesService = indicesService;
}

public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest request) throws IOException {
final ShardId shardId = request.shardId();
FieldCapabilitiesIndexResponse fetch(ShardId shardId, String[] fieldPatterns, QueryBuilder indexFilter,
long nowInMillis, Map<String, Object> runtimeFields) throws IOException {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(request.shardId().getId());
final IndexShard indexShard = indexService.getShard(shardId.getId());
try (Engine.Searcher searcher = indexShard.acquireSearcher(Engine.CAN_MATCH_SEARCH_SOURCE)) {

final SearchExecutionContext searchExecutionContext = indexService.newSearchExecutionContext(shardId.id(), 0,
searcher, request::nowInMillis, null, request.runtimeFields());
searcher, () -> nowInMillis, null, runtimeFields);

if (canMatchShard(request, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(request.index(), Collections.emptyMap(), false);
if (canMatchShard(shardId, indexFilter, nowInMillis, searchExecutionContext) == false) {
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), Collections.emptyMap(), false);
}

Set<String> fieldNames = new HashSet<>();
for (String pattern : request.fields()) {
for (String pattern : fieldPatterns) {
fieldNames.addAll(searchExecutionContext.getMatchingFieldNames(pattern));
}

Expand Down Expand Up @@ -100,17 +101,18 @@ public FieldCapabilitiesIndexResponse fetch(final FieldCapabilitiesIndexRequest
}
}
}
return new FieldCapabilitiesIndexResponse(request.index(), responseMap, true);
return new FieldCapabilitiesIndexResponse(shardId.getIndexName(), responseMap, true);
}
}

private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecutionContext searchExecutionContext) throws IOException {
if (req.indexFilter() == null || req.indexFilter() instanceof MatchAllQueryBuilder) {
private boolean canMatchShard(ShardId shardId, QueryBuilder indexFilter, long nowInMillis,
SearchExecutionContext searchExecutionContext) throws IOException {
if (indexFilter == null || indexFilter instanceof MatchAllQueryBuilder) {
return true;
}
assert req.nowInMillis() != 0L;
ShardSearchRequest searchRequest = new ShardSearchRequest(req.shardId(), req.nowInMillis(), AliasFilter.EMPTY);
searchRequest.source(new SearchSourceBuilder().query(req.indexFilter()));
assert nowInMillis != 0L;
ShardSearchRequest searchRequest = new ShardSearchRequest(shardId, nowInMillis, AliasFilter.EMPTY);
searchRequest.source(new SearchSourceBuilder().query(indexFilter));
return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext);
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand Down Expand Up @@ -49,9 +47,6 @@
* Dispatches child field-caps requests to old/new data nodes in the local cluster that have shards of the requesting indices.
*/
final class RequestDispatcher {

static final Version GROUP_REQUESTS_VERSION = Version.V_8_0_0;

static final Logger LOGGER = LogManager.getLogger(RequestDispatcher.class);

private final TransportService transportService;
Expand Down Expand Up @@ -128,7 +123,7 @@ private void innerExecute() {
for (Map.Entry<String, IndexSelector> e : indexSelectors.entrySet()) {
final String index = e.getKey();
final IndexSelector indexSelector = e.getValue();
final List<ShardRouting> selectedShards = indexSelector.nextTarget(clusterState.nodes(), hasFilter);
final List<ShardRouting> selectedShards = indexSelector.nextTarget(hasFilter);
if (selectedShards.isEmpty()) {
failedIndices.add(index);
} else {
Expand Down Expand Up @@ -163,41 +158,18 @@ int executionRound() {
private void sendRequestToNode(String nodeId, List<ShardId> shardIds) {
final DiscoveryNode node = clusterState.nodes().get(nodeId);
assert node != null;
if (node.getVersion().onOrAfter(GROUP_REQUESTS_VERSION)) {
LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds);
final ActionListener<FieldCapabilitiesNodeResponse> listener =
ActionListener.wrap(r -> onRequestResponse(shardIds, r), failure -> onRequestFailure(shardIds, failure));
final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest(
shardIds,
fieldCapsRequest.fields(),
originalIndices,
fieldCapsRequest.indexFilter(),
nowInMillis,
fieldCapsRequest.runtimeFields());
transportService.sendChildRequest(node, TransportFieldCapabilitiesAction.ACTION_NODE_NAME, nodeRequest, parentTask,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, FieldCapabilitiesNodeResponse::new));
} else {
for (ShardId shardId : shardIds) {
LOGGER.debug("round {} sends field caps shard request to node {} for shardId {}", executionRound, node, shardId);
final ActionListener<FieldCapabilitiesIndexResponse> listener = ActionListener.wrap(
r -> {
final FieldCapabilitiesNodeResponse nodeResponse;
if (r.canMatch()) {
nodeResponse = new FieldCapabilitiesNodeResponse(
Collections.singletonList(r), Collections.emptyMap(), Collections.emptySet());
} else {
nodeResponse = new FieldCapabilitiesNodeResponse(Collections.emptyList(), Collections.emptyMap(),
Collections.singleton(shardId));
}
onRequestResponse(Collections.singletonList(shardId), nodeResponse);
},
e -> onRequestFailure(Collections.singletonList(shardId), e));
final FieldCapabilitiesIndexRequest shardRequest = new FieldCapabilitiesIndexRequest(fieldCapsRequest.fields(), shardId,
originalIndices, fieldCapsRequest.indexFilter(), nowInMillis, fieldCapsRequest.runtimeFields());
transportService.sendChildRequest(node, TransportFieldCapabilitiesAction.ACTION_SHARD_NAME, shardRequest, parentTask,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, FieldCapabilitiesIndexResponse::new));
}
}
LOGGER.debug("round {} sends field caps node request to node {} for shardIds {}", executionRound, node, shardIds);
final ActionListener<FieldCapabilitiesNodeResponse> listener =
ActionListener.wrap(r -> onRequestResponse(shardIds, r), failure -> onRequestFailure(shardIds, failure));
final FieldCapabilitiesNodeRequest nodeRequest = new FieldCapabilitiesNodeRequest(
shardIds,
fieldCapsRequest.fields(),
originalIndices,
fieldCapsRequest.indexFilter(),
nowInMillis,
fieldCapsRequest.runtimeFields());
transportService.sendChildRequest(node, TransportFieldCapabilitiesAction.ACTION_NODE_NAME, nodeRequest, parentTask,
TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, FieldCapabilitiesNodeResponse::new));
}

private void afterRequestsCompleted(int numRequests) {
Expand Down Expand Up @@ -274,7 +246,7 @@ synchronized void addUnmatchedShardId(ShardId shardId) {
failures.remove(shardId);
}

synchronized List<ShardRouting> nextTarget(DiscoveryNodes discoveryNodes, boolean withQueryFilter) {
synchronized List<ShardRouting> nextTarget(boolean withQueryFilter) {
if (nodeToShards.isEmpty()) {
return Collections.emptyList();
}
Expand Down Expand Up @@ -306,21 +278,8 @@ synchronized List<ShardRouting> nextTarget(DiscoveryNodes discoveryNodes, boolea
} else {
assert unmatchedShardIds.isEmpty();
final Map.Entry<String, List<ShardRouting>> node = nodeIt.next();
// If the target node is on the new version, then we can ask it to process all its copies in a single request
// and the target node will process at most one valid copy. Otherwise, we should ask for a single copy to avoid
// sending multiple requests.
final DiscoveryNode discoNode = discoveryNodes.get(node.getKey());
if (discoNode.getVersion().onOrAfter(GROUP_REQUESTS_VERSION)) {
nodeIt.remove();
return node.getValue();
} else {
final List<ShardRouting> shards = node.getValue();
final ShardRouting selectedShard = shards.remove(0);
if (shards.isEmpty()) {
nodeIt.remove();
}
return Collections.singletonList(selectedShard);
}
nodeIt.remove();
return node.getValue();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

public class TransportFieldCapabilitiesAction extends HandledTransportAction<FieldCapabilitiesRequest, FieldCapabilitiesResponse> {
public static final String ACTION_NODE_NAME = FieldCapabilitiesAction.NAME + "[n]";
public static final String ACTION_SHARD_NAME = FieldCapabilitiesAction.NAME + "[index][s]";

private final ThreadPool threadPool;
private final TransportService transportService;
Expand All @@ -72,11 +71,8 @@ public TransportFieldCapabilitiesAction(TransportService transportService,
this.fieldCapabilitiesFetcher = new FieldCapabilitiesFetcher(indicesService);
final Set<String> metadataFields = indicesService.getAllMetadataFields();
this.metadataFieldPred = metadataFields::contains;

transportService.registerRequestHandler(ACTION_NODE_NAME, ThreadPool.Names.MANAGEMENT,
FieldCapabilitiesNodeRequest::new, new NodeTransportHandler());
transportService.registerRequestHandler(ACTION_SHARD_NAME, ThreadPool.Names.SAME,
FieldCapabilitiesIndexRequest::new, new ShardTransportHandler());
}

@Override
Expand Down Expand Up @@ -323,10 +319,9 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
final Map<ShardId, Exception> failures = new HashMap<>();
final Set<ShardId> unmatched = new HashSet<>();
for (ShardId shardId : shardIds) {
final FieldCapabilitiesIndexRequest indexRequest = new FieldCapabilitiesIndexRequest(request.fields(), shardId,
request.originalIndices(), request.indexFilter(), request.nowInMillis(), request.runtimeFields());
try {
final FieldCapabilitiesIndexResponse response = fieldCapabilitiesFetcher.fetch(indexRequest);
final FieldCapabilitiesIndexResponse response = fieldCapabilitiesFetcher.fetch(
shardId, request.fields(), request.indexFilter(), request.nowInMillis(), request.runtimeFields());
if (response.canMatch()) {
unmatched.clear();
failures.clear();
Expand All @@ -346,13 +341,4 @@ public void messageReceived(FieldCapabilitiesNodeRequest request, TransportChann
});
}
}

private class ShardTransportHandler implements TransportRequestHandler<FieldCapabilitiesIndexRequest> {
@Override
public void messageReceived(FieldCapabilitiesIndexRequest request, TransportChannel channel, Task task) throws Exception {
ActionListener<FieldCapabilitiesIndexResponse> listener = new ChannelActionListener<>(channel, ACTION_SHARD_NAME, request);
ActionListener.completeWith(listener, () -> fieldCapabilitiesFetcher.fetch(request));
}
}

}
Loading

0 comments on commit 700d01e

Please sign in to comment.