Skip to content

Commit

Permalink
Small optimisation in ClearScrollController (#92799)
Browse files Browse the repository at this point in the history
Today `ClearScrollController#closeContexts` uses a
`GroupedActionListener` which accumulates its results in an
`AtomicArray<Boolean>`, and then at the end it counts all the `TRUE`
entries to complete the final listener. There's no need for this `O(N)`
memory usage, we can just count the successes as they come in.
  • Loading branch information
DaveCTurner authored Jan 10, 2023
1 parent 9096f43 commit 5316222
Showing 1 changed file with 26 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -154,42 +154,39 @@ public static void closeContexts(
Collection<SearchContextIdForNode> contextIds,
ActionListener<Integer> listener
) {
if (contextIds.isEmpty()) {
listener.onResponse(0);
return;
}
final Set<String> clusters = contextIds.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
.collect(Collectors.toSet());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();
if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId));
} else {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
}
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
contextIds.size(),
listener.map(rs -> Math.toIntExact(rs.stream().filter(r -> r).count()))
);
for (SearchContextIdForNode contextId : contextIds) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node == null) {
groupedListener.onResponse(false);
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node);
searchTransportService.sendFreeContext(
connection,
contextId.getSearchContextId(),
ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false))
);
} catch (Exception e) {
groupedListener.onResponse(false);
lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> {
final var successes = new AtomicInteger();
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
for (SearchContextIdForNode contextId : contextIds) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node != null) {
try {
searchTransportService.sendFreeContext(
searchTransportService.getConnection(contextId.getClusterAlias(), node),
contextId.getSearchContextId(),
refs.acquireListener().map(r -> {
if (r.isFreed()) {
successes.incrementAndGet();
}
return null;
})
);
} catch (Exception e) {
// ignored
}
}
}
}
}, listener::onFailure);
}));
}
}

0 comments on commit 5316222

Please sign in to comment.