Skip to content

Commit

Permalink
Some Enhancements to ActionListener (#69103) (#69409)
Browse files Browse the repository at this point in the history
This adds best effort `toString` rendering the various wrapping
action listeners to make `TRACE` logging, that will currently only print the
top level listener `toString` which isn't helpful to find the original of a listener
in case of wrapped listeners, more useful (e.g. when logging rejected executions).
Also this change makes the `delegateX` methods less verbose to use and makes use of them
in a few spots where they weren't yet used.
  • Loading branch information
original-brownbear authored Feb 23, 2021
1 parent 5e70eae commit 4368b12
Show file tree
Hide file tree
Showing 56 changed files with 419 additions and 564 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.node.NodeClient;
Expand Down Expand Up @@ -57,22 +56,14 @@ protected void doExecute(Task task, SearchTemplateRequest request, ActionListene
try {
SearchRequest searchRequest = convert(request, response, scriptService, xContentRegistry);
if (searchRequest != null) {
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
try {
response.setResponse(searchResponse);
listener.onResponse(response);
} catch (Exception t) {
listener.onFailure(t);
}
client.search(searchRequest, listener.delegateFailure((l, searchResponse) -> {
try {
response.setResponse(searchResponse);
l.onResponse(response);
} catch (Exception t) {
l.onFailure(t);
}

@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
}));
} else {
listener.onResponse(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -63,8 +62,7 @@ public class TransportRankEvalAction extends HandledTransportAction<RankEvalRequ
@Inject
public TransportRankEvalAction(ActionFilters actionFilters, Client client, TransportService transportService,
ScriptService scriptService, NamedXContentRegistry namedXContentRegistry) {
super(RankEvalAction.NAME, transportService, actionFilters,
(Writeable.Reader<RankEvalRequest>) RankEvalRequest::new);
super(RankEvalAction.NAME, transportService, actionFilters, RankEvalRequest::new);
this.scriptService = scriptService;
this.namedXContentRegistry = namedXContentRegistry;
this.client = client;
Expand Down Expand Up @@ -126,17 +124,16 @@ LoggingDeprecationHandler.INSTANCE, new BytesArray(resolvedRequest), XContentTyp
ratedRequestsInSearch.toArray(new RatedRequest[ratedRequestsInSearch.size()]), errors));
}

class RankEvalActionListener implements ActionListener<MultiSearchResponse> {
static class RankEvalActionListener extends ActionListener.Delegating<MultiSearchResponse, RankEvalResponse> {

private final ActionListener<RankEvalResponse> listener;
private final RatedRequest[] specifications;

private final Map<String, Exception> errors;
private final EvaluationMetric metric;

RankEvalActionListener(ActionListener<RankEvalResponse> listener, EvaluationMetric metric, RatedRequest[] specifications,
Map<String, Exception> errors) {
this.listener = listener;
super(listener);
this.metric = metric;
this.errors = errors;
this.specifications = specifications;
Expand All @@ -157,12 +154,7 @@ public void onResponse(MultiSearchResponse multiSearchResponse) {
}
responsePosition++;
}
listener.onResponse(new RankEvalResponse(this.metric.combine(responseDetails.values()), responseDetails, this.errors));
}

@Override
public void onFailure(Exception exception) {
listener.onFailure(exception);
delegate.onResponse(new RankEvalResponse(this.metric.combine(responseDetails.values()), responseDetails, this.errors));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,8 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void startSlicedAc
Client client,
DiscoveryNode node,
Runnable workerAction) {
initTaskState(task, request, client, new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
executeSlicedAction(task, request, action, listener, client, node, workerAction);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
initTaskState(task, request, client, listener.delegateFailure(
(l, v) -> executeSlicedAction(task, request, action, l, client, node, workerAction)));
}

/**
Expand Down Expand Up @@ -114,18 +105,10 @@ static <Request extends AbstractBulkByScrollRequest<Request>> void initTaskState
if (configuredSlices == AbstractBulkByScrollRequest.AUTO_SLICES) {
ClusterSearchShardsRequest shardsRequest = new ClusterSearchShardsRequest();
shardsRequest.indices(request.getSearchRequest().indices());
client.admin().cluster().searchShards(shardsRequest, new ActionListener<ClusterSearchShardsResponse>() {
@Override
public void onResponse(ClusterSearchShardsResponse response) {
setWorkerCount(request, task, countSlicesBasedOnShards(response));
listener.onResponse(null);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
client.admin().cluster().searchShards(shardsRequest, listener.delegateFailure((l, response) -> {
setWorkerCount(request, task, countSlicesBasedOnShards(response));
l.onResponse(null);
}));
} else {
setWorkerCount(request, task, configuredSlices);
listener.onResponse(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,8 @@ protected TransportReindexAction(String name, Settings settings, ThreadPool thre
protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
validate(request);
BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;
reindexer.initTask(bulkByScrollTask, request, new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
reindexer.execute(bulkByScrollTask, request, getBulkClient(), listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
reindexer.initTask(bulkByScrollTask, request,
listener.delegateFailure((l, v) -> reindexer.execute(bulkByScrollTask, request, getBulkClient(), l)));
}

/**
Expand Down
Loading

0 comments on commit 4368b12

Please sign in to comment.