Skip to content

Commit

Permalink
Optimize TransportNodesAction to not send DiscoveryNodes for NodeStat…
Browse files Browse the repository at this point in the history
…s, NodesInfo and ClusterStats call

Signed-off-by: Pranshu Shukla <[email protected]>
  • Loading branch information
Pranshu-S committed Jul 12, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 93d507a commit d2ef37d
Showing 13 changed files with 454 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -129,7 +129,7 @@ protected NodeInfo nodeOperation(NodeInfoRequest nodeRequest) {
*/
public static class NodeInfoRequest extends TransportRequest {

NodesInfoRequest request;
protected NodesInfoRequest request;

public NodeInfoRequest(StreamInput in) throws IOException {
super(in);
Original file line number Diff line number Diff line change
@@ -140,7 +140,7 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
*/
public static class NodeStatsRequest extends TransportRequest {

NodesStatsRequest request;
protected NodesStatsRequest request;

public NodeStatsRequest(StreamInput in) throws IOException {
super(in);
Original file line number Diff line number Diff line change
@@ -223,7 +223,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
*/
public static class ClusterStatsNodeRequest extends TransportRequest {

ClusterStatsRequest request;
protected ClusterStatsRequest request;

public ClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
Original file line number Diff line number Diff line change
@@ -65,6 +65,14 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
* will be ignored and this will be used.
* */
private DiscoveryNode[] concreteNodes;

/**
* Since do not use the discovery nodes coming from the request in all code paths following a request extended off from
* BaseNodeRequest, we do not require it to sent around across all nodes.
*
* Setting default behavior as `true` but can be explicitly changed in requests that do not require.
*/
private boolean populateDiscoveryNodesInTransportRequest = true;
private final TimeValue DEFAULT_TIMEOUT_SECS = TimeValue.timeValueSeconds(30);

private TimeValue timeout;
@@ -119,6 +127,14 @@ public void setConcreteNodes(DiscoveryNode[] concreteNodes) {
this.concreteNodes = concreteNodes;
}

public void populateDiscoveryNodesInTransportRequest(boolean value) {
populateDiscoveryNodesInTransportRequest = value;
}

public boolean populateDiscoveryNodesInTransportRequest() {
return populateDiscoveryNodesInTransportRequest;
}

@Override
public ActionRequestValidationException validate() {
return null;
Original file line number Diff line number Diff line change
@@ -209,6 +209,15 @@ protected void resolveRequest(NodesRequest request, ClusterState clusterState) {
request.setConcreteNodes(Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new));
}

/**
* Return the concrete nodes from the request node ids which will be later used for routing requests to nodes.
**/
protected DiscoveryNode[] resolveConcreteNodes(NodesRequest request, ClusterState clusterState) {
assert request.concreteNodes() == null : "request concreteNodes shouldn't be set";
String[] nodesIds = clusterState.nodes().resolveNodes(request.nodesIds());
return Arrays.stream(nodesIds).map(clusterState.nodes()::get).toArray(DiscoveryNode[]::new);
}

/**
* Get a backwards compatible transport action name
*/
@@ -226,6 +235,7 @@ class AsyncAction {
private final NodesRequest request;
private final ActionListener<NodesResponse> listener;
private final AtomicReferenceArray<Object> responses;
private final DiscoveryNode[] concreteNodes;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;

@@ -234,14 +244,27 @@ class AsyncAction {
this.request = request;
this.listener = listener;
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
if (request.populateDiscoveryNodesInTransportRequest()) {
resolveRequest(request, clusterService.state());
assert request.concreteNodes() != null;
this.concreteNodes = null;
} else {
this.concreteNodes = resolveConcreteNodes(request, clusterService.state());
assert request.concreteNodes() == null;
}
} else {
this.concreteNodes = null;
}
if (request.concreteNodes() == null) {
assert concreteNodes != null;
this.responses = new AtomicReferenceArray<>(concreteNodes.length);
} else {
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
}

void start() {
final DiscoveryNode[] nodes = request.concreteNodes();
final DiscoveryNode[] nodes = request.concreteNodes() != null ? request.concreteNodes() : concreteNodes;
if (nodes.length == 0) {
// nothing to notify
threadPool.generic().execute(() -> listener.onResponse(newResponse(request, responses)));
@@ -260,7 +283,6 @@ void start() {
if (task != null) {
nodeRequest.setParentTask(clusterService.localNode().getId(), task.getId());
}

transportService.sendRequest(
node,
getTransportNodeAction(node),
Original file line number Diff line number Diff line change
@@ -66,6 +66,7 @@ public String getName() {
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
ClusterStatsRequest clusterStatsRequest = new ClusterStatsRequest().nodesIds(request.paramAsStringArray("nodeId", null));
clusterStatsRequest.timeout(request.param("timeout"));
clusterStatsRequest.populateDiscoveryNodesInTransportRequest(false);
return channel -> client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
}

Original file line number Diff line number Diff line change
@@ -88,7 +88,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final NodesInfoRequest nodesInfoRequest = prepareRequest(request);
nodesInfoRequest.timeout(request.param("timeout"));
settingsFilter.addFilterSettingParams(request);

nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false);
return channel -> client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
}

Original file line number Diff line number Diff line change
@@ -232,6 +232,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
// If no levels are passed in this results in an empty array.
String[] levels = Strings.splitStringByCommaToArray(request.param("level"));
nodesStatsRequest.indices().setLevels(levels);
nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false);

return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
}
Original file line number Diff line number Diff line change
@@ -125,6 +125,7 @@ public RestChannelConsumer doCatRequest(final RestRequest request, final NodeCli
public void processResponse(final ClusterStateResponse clusterStateResponse) {
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.timeout(request.param("timeout"));
nodesInfoRequest.populateDiscoveryNodesInTransportRequest(false);
nodesInfoRequest.clear()
.addMetrics(
NodesInfoRequest.Metric.JVM.metricName(),
@@ -137,6 +138,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) {
public void processResponse(final NodesInfoResponse nodesInfoResponse) {
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
nodesStatsRequest.timeout(request.param("timeout"));
nodesStatsRequest.populateDiscoveryNodesInTransportRequest(false);
nodesStatsRequest.clear()
.indices(true)
.addMetrics(
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.action.admin.cluster.stats.TransportClusterStatsAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.test.transport.CapturingTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TransportClusterStatsActionTests extends TransportNodesActionTests {

/**
* By default, we send discovery nodes list to each request that is sent across from the coordinator node. This
* behavior is asserted in this test.
*/
public void testDefaultBehavior() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.populateDiscoveryNodesInTransportRequest(true);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> {
assertNotNull(sentRequest.getDiscoveryNodes());
assertEquals(sentRequest.getDiscoveryNodes().length, clusterService.state().nodes().getSize());
});
});
}

/**
* In the optimized ClusterStats Request, we do not send the DiscoveryNodes List to each node. This behavior is
* asserted in this test.
*/
public void testOptimizedBehavior() {
ClusterStatsRequest request = new ClusterStatsRequest();
request.populateDiscoveryNodesInTransportRequest(false);
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = performNodesInfoAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

private Map<String, List<MockClusterStatsNodeRequest>> performNodesInfoAction(ClusterStatsRequest request) {
TransportNodesAction action = getTestTransportClusterStatsAction();
PlainActionFuture<NodesStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<MockClusterStatsNodeRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<MockClusterStatsNodeRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
TransportClusterStatsAction.ClusterStatsNodeRequest clusterStatsNodeRequestFromCoordinator =
(TransportClusterStatsAction.ClusterStatsNodeRequest) preSentRequest.request;
clusterStatsNodeRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
MockClusterStatsNodeRequest mockClusterStatsNodeRequest = new MockClusterStatsNodeRequest(in);
sentRequestList.add(mockClusterStatsNodeRequest);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

combinedSentRequest.put(node, sentRequestList);
});

return combinedSentRequest;
}

public TestTransportClusterStatsAction getTestTransportClusterStatsAction() {
return new TestTransportClusterStatsAction(
THREAD_POOL,
clusterService,
transportService,
nodeService,
indicesService,
new ActionFilters(Collections.emptySet())
);
}

private static class TestTransportClusterStatsAction extends TransportClusterStatsAction {
public TestTransportClusterStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
NodeService nodeService,
IndicesService indicesService,
ActionFilters actionFilters
) {
super(threadPool, clusterService, transportService, nodeService, indicesService, actionFilters);
}
}

private static class MockClusterStatsNodeRequest extends TransportClusterStatsAction.ClusterStatsNodeRequest {

public MockClusterStatsNodeRequest(StreamInput in) throws IOException {
super(in);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
}
}
Original file line number Diff line number Diff line change
@@ -46,6 +46,8 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.transport.CapturingTransport;
@@ -76,11 +78,12 @@

public class TransportNodesActionTests extends OpenSearchTestCase {

private static ThreadPool THREAD_POOL;

private ClusterService clusterService;
private CapturingTransport transport;
private TransportService transportService;
protected static ThreadPool THREAD_POOL;
protected ClusterService clusterService;
protected CapturingTransport transport;
protected TransportService transportService;
protected NodeService nodeService;
protected IndicesService indicesService;

public void testRequestIsSentToEachNode() throws Exception {
TransportNodesAction action = getTestTransportNodesAction();
Loading

0 comments on commit d2ef37d

Please sign in to comment.