Skip to content

Commit

Permalink
Reuse local node in async shard fetch responses (#77991)
Browse files Browse the repository at this point in the history
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch adds a `DiscoveryNode` parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

- `TransportNodesListShardStoreMetadata`
- `TransportNodesListGatewayStartedShards`

Relates #77266
  • Loading branch information
DaveCTurner authored Sep 20, 2021
1 parent e1a219c commit 5486783
Show file tree
Hide file tree
Showing 36 changed files with 112 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -57,7 +58,7 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -49,7 +50,7 @@ protected NodeRequest newNodeRequest(NodesHotThreadsRequest request) {
}

@Override
protected NodeHotThreads newNodeResponse(StreamInput in) throws IOException {
protected NodeHotThreads newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeHotThreads(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -52,7 +53,7 @@ protected NodeInfoRequest newNodeRequest(NodesInfoRequest request) {
}

@Override
protected NodeInfo newNodeResponse(StreamInput in) throws IOException {
protected NodeInfo newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeInfo(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected NodeRequest newNodeRequest(NodesReloadSecureSettingsRequest request) {
}

@Override
protected NodesReloadSecureSettingsResponse.NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodesReloadSecureSettingsResponse.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodesReloadSecureSettingsResponse.NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -54,7 +55,7 @@ protected NodeStatsRequest newNodeRequest(NodesStatsRequest request) {
}

@Override
protected NodeStats newNodeResponse(StreamInput in) throws IOException {
protected NodeStats newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeStats(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -55,7 +56,7 @@ protected NodeUsageRequest newNodeRequest(NodesUsageRequest request) {
}

@Override
protected NodeUsage newNodeResponse(StreamInput in) throws IOException {
protected NodeUsage newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeUsage(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeSnapshotStatus newNodeResponse(StreamInput in) throws IOException {
protected NodeSnapshotStatus newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeSnapshotStatus(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -119,7 +120,7 @@ protected ClusterStatsNodeRequest newNodeRequest(ClusterStatsRequest request) {
}

@Override
protected ClusterStatsNodeResponse newNodeResponse(StreamInput in) throws IOException {
protected ClusterStatsNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new ClusterStatsNodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected NodeFindDanglingIndexRequest newNodeRequest(FindDanglingIndexRequest r
}

@Override
protected NodeFindDanglingIndexResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeFindDanglingIndexResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeFindDanglingIndexResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected NodeListDanglingIndicesRequest newNodeRequest(ListDanglingIndicesReque
}

@Override
protected NodeListDanglingIndicesResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeListDanglingIndicesResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeListDanglingIndicesResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;
Expand All @@ -20,11 +21,34 @@
*/
public abstract class BaseNodeResponse extends TransportResponse {

private DiscoveryNode node;
private final DiscoveryNode node;

protected BaseNodeResponse(StreamInput in) throws IOException {
/**
* Read a response from the given stream, re-using the given {@link DiscoveryNode} object if non-null.
*
* On the wire a {@link BaseNodeResponse} message starts with a {@link DiscoveryNode} identifying the original responder. If the sender
* knows the identity of the responder already then we prefer to use that rather than reading the object from the wire, since {@link
* DiscoveryNode} objects are sometimes quite large and yet they're immutable so there's no need to have multiple copies in memory.
*
* @param node the expected remote node, or {@code null} if not known.
*/
protected BaseNodeResponse(StreamInput in, @Nullable DiscoveryNode node) throws IOException {
super(in);
node = new DiscoveryNode(in);
final DiscoveryNode remoteNode = new DiscoveryNode(in);
if (node == null) {
this.node = remoteNode;
} else {
assert remoteNode.equals(node) : remoteNode + " vs " + node;
this.node = node;
}
}

/**
* Read a response from the given stream, with no {@link DiscoveryNode} object re-use. Callers should not use this constructor if the
* local node is known, and instead should call {@link #BaseNodeResponse(StreamInput, DiscoveryNode)}.
*/
protected BaseNodeResponse(StreamInput in) throws IOException {
this(in, null);
}

protected BaseNodeResponse(DiscoveryNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ protected void newResponseAsync(

protected abstract NodeRequest newNodeRequest(NodesRequest request);

protected abstract NodeResponse newNodeResponse(StreamInput in) throws IOException;
protected abstract NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException;

protected abstract NodeResponse nodeOperation(NodeRequest request, Task task);

Expand Down Expand Up @@ -218,7 +218,7 @@ void start() {
new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
return newNodeResponse(in);
return newNodeResponse(in, node);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.transport.ReceiveTimeoutTransportException;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand All @@ -34,6 +35,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;

Expand Down Expand Up @@ -282,6 +285,7 @@ void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
assert assertSameNodes(response);
processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
}

Expand All @@ -293,6 +297,17 @@ public void onFailure(Exception e) {
}
processAsyncFetch(null, failures, fetchingRound);
}

private boolean assertSameNodes(BaseNodesResponse<T> response) {
final Map<String, DiscoveryNode> nodesById
= Arrays.stream(nodes).collect(Collectors.toMap(DiscoveryNode::getEphemeralId, Function.identity()));
for (T nodeResponse : response.getNodes()) {
final DiscoveryNode responseNode = nodeResponse.getNode();
final DiscoveryNode localNode = nodesById.get(responseNode.getEphemeralId());
assert localNode == responseNode : "not reference equal: " + localNode + " vs " + responseNode;
}
return true;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeGatewayMetaState newNodeResponse(StreamInput in) throws IOException {
protected NodeGatewayMetaState newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeGatewayMetaState(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeGatewayStartedShards newNodeResponse(StreamInput in) throws IOException {
return new NodeGatewayStartedShards(in);
protected NodeGatewayStartedShards newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
final NodeGatewayStartedShards response = new NodeGatewayStartedShards(in, node);
assert response.getNode() == node;
return response;
}

@Override
Expand Down Expand Up @@ -271,7 +273,11 @@ public static class NodeGatewayStartedShards extends BaseNodeResponse {
private final Exception storeException;

public NodeGatewayStartedShards(StreamInput in) throws IOException {
super(in);
this(in, null);
}

public NodeGatewayStartedShards(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
allocationId = in.readOptionalString();
primary = in.readBoolean();
if (in.readBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ protected NodeRequest newNodeRequest(Request request) {
}

@Override
protected NodeStoreFilesMetadata newNodeResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetadata(in);
protected NodeStoreFilesMetadata newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
final NodeStoreFilesMetadata nodeStoreFilesMetadata = new NodeStoreFilesMetadata(in, node);
assert nodeStoreFilesMetadata.getNode() == node;
return nodeStoreFilesMetadata;
}

@Override
Expand Down Expand Up @@ -345,10 +347,10 @@ public String getCustomDataPath() {

public static class NodeStoreFilesMetadata extends BaseNodeResponse {

private StoreFilesMetadata storeFilesMetadata;
private final StoreFilesMetadata storeFilesMetadata;

public NodeStoreFilesMetadata(StreamInput in) throws IOException {
super(in);
public NodeStoreFilesMetadata(StreamInput in, DiscoveryNode node) throws IOException {
super(in, node);
storeFilesMetadata = new StoreFilesMetadata(in);
}

Expand All @@ -362,7 +364,7 @@ public StoreFilesMetadata storeFilesMetadata() {
}

public static NodeStoreFilesMetadata readListShardStoreNodeOperationResponse(StreamInput in) throws IOException {
return new NodeStoreFilesMetadata(in);
return new NodeStoreFilesMetadata(in, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ protected NodesResponse newResponse(NodesRequest request, List<NodeResponse> res
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ protected NodeRequest newNodeRequest(NodesRequest request) {
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.action.support.tasks.BaseTasksRequest;
import org.elasticsearch.action.support.tasks.BaseTasksResponse;
import org.elasticsearch.action.support.tasks.TransportTasksAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -143,7 +144,7 @@ protected NodeRequest newNodeRequest(NodesRequest request) {
}

@Override
protected NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodeResponse(in);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ protected TestNodeRequest newNodeRequest(TestNodesRequest request) {
}

@Override
protected TestNodeResponse newNodeResponse(StreamInput in) throws IOException {
protected TestNodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new TestNodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected AnalyticsStatsAction.NodeRequest newNodeRequest(AnalyticsStatsAction.R
}

@Override
protected AnalyticsStatsAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected AnalyticsStatsAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new AnalyticsStatsAction.NodeResponse(in);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -57,7 +58,7 @@ protected NodesDeprecationCheckAction.NodeRequest newNodeRequest(NodesDeprecatio
}

@Override
protected NodesDeprecationCheckAction.NodeResponse newNodeResponse(StreamInput in) throws IOException {
protected NodesDeprecationCheckAction.NodeResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
return new NodesDeprecationCheckAction.NodeResponse(in);
}

Expand Down
Loading

0 comments on commit 5486783

Please sign in to comment.