diff --git a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index 8a19876777ded..9514785450f68 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -10,7 +10,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.StepListener; -import org.elasticsearch.action.support.GroupedActionListener; +import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; @@ -154,42 +154,39 @@ public static void closeContexts( Collection contextIds, ActionListener listener ) { - if (contextIds.isEmpty()) { - listener.onResponse(0); - return; - } final Set clusters = contextIds.stream() - .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) .map(SearchContextIdForNode::getClusterAlias) + .filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false) .collect(Collectors.toSet()); final StepListener> lookupListener = new StepListener<>(); - if (clusters.isEmpty() == false) { - searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); - } else { + if (clusters.isEmpty()) { lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId)); + } else { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); } - lookupListener.whenComplete(nodeLookup -> { - final GroupedActionListener groupedListener = new GroupedActionListener<>( - contextIds.size(), - listener.map(rs -> Math.toIntExact(rs.stream().filter(r -> r).count())) - ); - for (SearchContextIdForNode contextId : contextIds) { - final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); - if (node == null) { - groupedListener.onResponse(false); - } else { - try { - final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node); - searchTransportService.sendFreeContext( - connection, - contextId.getSearchContextId(), - ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)) - ); - } catch (Exception e) { - groupedListener.onResponse(false); + lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> { + final var successes = new AtomicInteger(); + try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) { + for (SearchContextIdForNode contextId : contextIds) { + final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); + if (node != null) { + try { + searchTransportService.sendFreeContext( + searchTransportService.getConnection(contextId.getClusterAlias(), node), + contextId.getSearchContextId(), + refs.acquireListener().map(r -> { + if (r.isFreed()) { + successes.incrementAndGet(); + } + return null; + }) + ); + } catch (Exception e) { + // ignored + } } } } - }, listener::onFailure); + })); } }