Skip to content

Commit

Permalink
Break up and simplify TransportFieldCapabilitiesAction (elastic#76958)
Browse files Browse the repository at this point in the history
`TransportFieldCapabilitiesAction` currently holds a lot of logic. This PR
breaks it up into smaller pieces and simplifies its large `doExecute` method.
Simplifying the class will help before we start to make field caps
optimizations.

Changes:
* Factor some methods out of `doExecute` to reduce its length
* Streamline index block checking

This backport doesn't include the change "Pull AsyncShardAction out into its
own class", since it's already part of a separate class in 7.x.
  • Loading branch information
jtibshirani committed Aug 30, 2021
1 parent 4079bc9 commit 13e86e3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -78,41 +79,18 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
} else {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
}

checkIndexBlocks(clusterState, concreteIndices);

final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size();
if (totalNumRequest == 0) {
listener.onResponse(new FieldCapabilitiesResponse(new String[0], Collections.emptyMap()));
return;
}

final CountDown completionCounter = new CountDown(totalNumRequest);
final List<FieldCapabilitiesIndexResponse> indexResponses = Collections.synchronizedList(new ArrayList<>());
final FailureCollector indexFailures = new FailureCollector();
final Runnable countDown = () -> {
if (completionCounter.countDown()) {
List<FieldCapabilitiesFailure> failures = indexFailures.values();
if (indexResponses.size() > 0) {
if (request.isMergeResults()) {
// fork off to the management pool for merging the responses as the operation can run for longer than is acceptable
// on a transport thread in case of large numbers of indices and/or fields
threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(
ActionRunnable.supply(
listener,
() -> merge(indexResponses, request.includeUnmapped(), new ArrayList<>(failures)))
);
} else {
listener.onResponse(new FieldCapabilitiesResponse(indexResponses, new ArrayList<>(failures)));
}
} else {
// we have no responses at all, maybe because of errors
if (indexFailures.size() > 0) {
// throw back the first exception
listener.onFailure(failures.iterator().next().getException());
} else {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
}
}
}
};
final Runnable countDown = createResponseMerger(request, totalNumRequest, indexResponses, indexFailures, listener);

if (concreteIndices.length > 0) {
// fork this action to the management pool as it can fan out to a large number of child requests that get handled on SAME and
Expand All @@ -123,14 +101,7 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
new TransportFieldCapabilitiesIndexAction.AsyncShardsAction(
transportService,
clusterService,
new FieldCapabilitiesIndexRequest(
request.fields(),
index,
localIndices,
request.indexFilter(),
nowInMillis,
request.runtimeFields()
),
prepareLocalIndexRequest(request, index, localIndices, nowInMillis),
new ActionListener<FieldCapabilitiesIndexResponse>() {
@Override
public void onResponse(FieldCapabilitiesIndexResponse result) {
Expand All @@ -157,14 +128,7 @@ public void onFailure(Exception e) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
Client remoteClusterClient = transportService.getRemoteClusterService().getRemoteClusterClient(threadPool, clusterAlias);
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteRequest.runtimeFields(request.runtimeFields());
remoteRequest.indexFilter(request.indexFilter());
remoteRequest.nowInMillis(nowInMillis);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(request, originalIndices, nowInMillis);
remoteClusterClient.fieldCaps(remoteRequest, ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
indexResponses.add(
Expand All @@ -188,6 +152,69 @@ public void onFailure(Exception e) {
}
}

private void checkIndexBlocks(ClusterState clusterState, String[] concreteIndices) {
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
for (String index : concreteIndices) {
clusterState.blocks().indexBlockedRaiseException(ClusterBlockLevel.READ, index);
}
}

private Runnable createResponseMerger(FieldCapabilitiesRequest request,
int totalNumRequests,
List<FieldCapabilitiesIndexResponse> indexResponses,
FailureCollector indexFailures,
ActionListener<FieldCapabilitiesResponse> listener) {
final CountDown completionCounter = new CountDown(totalNumRequests);
return () -> {
if (completionCounter.countDown()) {
List<FieldCapabilitiesFailure> failures = indexFailures.values();
if (indexResponses.size() > 0) {
if (request.isMergeResults()) {
// fork off to the management pool for merging the responses as the operation can run for longer than is acceptable
// on a transport thread in case of large numbers of indices and/or fields
threadPool.executor(ThreadPool.Names.MANAGEMENT).submit(
ActionRunnable.supply(
listener,
() -> merge(indexResponses, request.includeUnmapped(), new ArrayList<>(failures)))
);
} else {
listener.onResponse(new FieldCapabilitiesResponse(indexResponses, new ArrayList<>(failures)));
}
} else {
// we have no responses at all, maybe because of errors
if (indexFailures.size() > 0) {
// throw back the first exception
listener.onFailure(failures.iterator().next().getException());
} else {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
}
}
}
};
}

private static FieldCapabilitiesIndexRequest prepareLocalIndexRequest(FieldCapabilitiesRequest request,
String index,
OriginalIndices originalIndices,
long nowInMillis) {
return new FieldCapabilitiesIndexRequest(request.fields(), index, originalIndices,
request.indexFilter(), nowInMillis, request.runtimeFields());
}

private static FieldCapabilitiesRequest prepareRemoteRequest(FieldCapabilitiesRequest request,
OriginalIndices originalIndices,
long nowInMillis) {
FieldCapabilitiesRequest remoteRequest = new FieldCapabilitiesRequest();
remoteRequest.setMergeResults(false); // we need to merge on this node
remoteRequest.indicesOptions(originalIndices.indicesOptions());
remoteRequest.indices(originalIndices.indices());
remoteRequest.fields(request.fields());
remoteRequest.runtimeFields(request.runtimeFields());
remoteRequest.indexFilter(request.indexFilter());
remoteRequest.nowInMillis(nowInMillis);
return remoteRequest;
}

private FieldCapabilitiesResponse merge(
List<FieldCapabilitiesIndexResponse> indexResponses,
boolean includeUnmapped,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,21 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.MappedFieldType;
Expand Down Expand Up @@ -69,8 +66,6 @@ public class TransportFieldCapabilitiesIndexAction

private static final String ACTION_NAME = FieldCapabilitiesAction.NAME + "[index]";
private static final String ACTION_SHARD_NAME = ACTION_NAME + "[s]";
public static final ActionType<FieldCapabilitiesIndexResponse> TYPE =
new ActionType<>(ACTION_NAME, FieldCapabilitiesIndexResponse::new);

private final ClusterService clusterService;
private final TransportService transportService;
Expand Down Expand Up @@ -166,14 +161,6 @@ private boolean canMatchShard(FieldCapabilitiesIndexRequest req, SearchExecution
return SearchService.queryStillMatchesAfterRewrite(searchRequest, searchExecutionContext);
}

private static ClusterBlockException checkGlobalBlock(ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.READ);
}

private static ClusterBlockException checkRequestBlock(ClusterState state, String concreteIndex) {
return state.blocks().indexBlockedException(ClusterBlockLevel.READ, concreteIndex);
}

/**
* An action that executes on each shard sequentially until it finds one that can match the provided
* {@link FieldCapabilitiesIndexRequest#indexFilter()}. In which case the shard is used
Expand All @@ -200,17 +187,7 @@ public AsyncShardsAction(TransportService transportService,
logger.trace("executing [{}] based on cluster state version [{}]", request, clusterState.version());
}
nodes = clusterState.nodes();
ClusterBlockException blockException = checkGlobalBlock(clusterState);
if (blockException != null) {
throw blockException;
}

this.request = request;
blockException = checkRequestBlock(clusterState, request.index());
if (blockException != null) {
throw blockException;
}

shardsIt = clusterService.operationRouting().searchShards(clusterService.state(),
new String[]{request.index()}, null, null, null, null);
}
Expand Down

0 comments on commit 13e86e3

Please sign in to comment.