Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Too many async fetch shard results caused JVM heap explosion during cluster recovering. #76218

Open
howardhuanghua opened this issue Aug 8, 2021 · 6 comments
Labels
>bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination.

Comments

@howardhuanghua
Copy link
Contributor

howardhuanghua commented Aug 8, 2021

Cluster version: 7.10.1.
Dedicated master nodes: 3.
Dedicated data nodes: 163.
Total primary shards: 80k, no replicas.

After full cluster restart, all the primaries would be fetched from all of the data nodes concurrently:

protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type
Lister<BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> lister = this::listStartedShards;
AsyncShardFetch<NodeGatewayStartedShards> fetch =
asyncFetchStarted.computeIfAbsent(shard.shardId(),
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
lister));
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState =
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData()) {
shardState.processAllocation(allocation);
}
return shardState;
}

The fetch result mainly contains DiscoveryNode and TransportAddress, around 1.7KB heap memory usage.
So single shard fetch result would cost: 1.7KB * 163 data nodes = 280KB. And 80k shard would cost: 80000 * 280KB = 21GB.
This big heap cost would explode current master node's jvm heap:
image
image

Even we have a reasonable 50k shards in a cluster, it would almost need 15GB heap, that's a huge memory cost.

Several ideas try to solve this issue:

  1. Single shard should only belong to single node. After cluster restart, we don't need to send fetch request to all of the data nodes. But in a fresh started cluster, no routing table info could extract the previous allocated node for a shard. Could we try to save node id info just like inSyncAllocationIds in IndexMetadata ? Then we could send single fetch request to the target node that shard used to be allocated.

  2. We could see that BaseNodeResponse contains a DiscoveryNode and basic abstract class TransportMessage, the TransportMessage has a duplicated entry in DiscoveryNode. In the fetch case, only nodeId is required. The node attributes are not necessary at all. Could we only return nodeId in shard fetch response instead of the heavy structures?

  3. Shard recovery has node level concurrency limitations, could we fetch partial of the shards' store info instead of fetching all the shards together at the beginning?

  4. Send request per node, batch all of the shards info result together.

@howardhuanghua howardhuanghua added >bug needs:triage Requires assignment of a team area label labels Aug 8, 2021
@DaveCTurner DaveCTurner added :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) and removed needs:triage Requires assignment of a team area label labels Aug 8, 2021
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Aug 8, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner
Copy link
Contributor

First of all yes this does seem like a problem, and I appreciate the detailed report. I'm not sure everyone would call describe 50k shards in a single cluster as "reasonable", at least I'm not totally surprised there are some things we don't test so thoroughly at that scale, but we do track gaps like the one you've found so I'll add it to the list.

The ideas you suggest are all insightful, although none is an obvious winner.

  1. I think tracking the node ID alongside the allocation ID would find some challenging corner cases, for instance to do with dangling indices, although at a high level I see why this might make sense.

  2. We pass DiscoveryNode objects around over the wire in lots of places as if they're lightweight values, but really these days they're quite heavy things. Maybe we should try and find a way to deduplicate the DiscoveryNode received over the wire because we almost certainly have an instance of the right thing in memory already. In this case the problem is more that we keep each multi-kB NodeGatewayStartedShards in the cache, I think we could keep hold of something much lighter.

  3. It's not really the number of concurrent fetches that's a problem, it's that we have to keep the details of all previous fetches around in memory until the shard is allocated. If we can't allocate it straight away then we must try with other shards, or else we'd risk blocking some allocations behind other ones that will never work. We could try dropping data from the cache to bound its size perhaps, although we'd have to add some extra machinery to prevent looping forever.

  4. I think batching would be similarly challenging. Again it's not the concurrent fetches that presents the problem, it's the size of the cache, and batching wouldn't help with that.

@howardhuanghua
Copy link
Contributor Author

@DaveCTurner Thanks for the reply. It seems the second idea, reduce the response result in cache would be simple and straightforward as tentative solution. We could transfer the DiscoveryNode to node ID only for the fetching case when master received the heavy result. ^^

  1. We pass DiscoveryNode objects around over the wire in lots of places as if they're lightweight values, but really these days they're quite heavy things. Maybe we should try and find a way to deduplicate the DiscoveryNode received over the wire because we almost certainly have an instance of the right thing in memory already. In this case the problem is more that we keep each multi-kB NodeGatewayStartedShards in the cache, I think we could keep hold of something much lighter.

@DaveCTurner
Copy link
Contributor

Yes I think that's a nice self-contained improvement to do here. On reflection I think it doesn't completely fix the problem, there's still a risk that we collect almost all the responses in memory before inserting them in cache, but this is unlikely. Are you offering a PR? That'd be great if so.

@howardhuanghua
Copy link
Contributor Author

Thanks. After the implementaion I would submit a PR.

@howardhuanghua
Copy link
Contributor Author

Hi @DaveCTurner , I have implemented purge logic after master node receiving shard fetch result. Please help to check it, thanks.
#77266

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination.
Projects
None yet
Development

No branches or pull requests

3 participants