Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Massive async shard fetch requests consume lots of heap memories on master node. #80694

Open
Tracked by #77466
howardhuanghua opened this issue Nov 14, 2021 · 10 comments · May be fixed by #81081
Open
Tracked by #77466

Massive async shard fetch requests consume lots of heap memories on master node. #80694

howardhuanghua opened this issue Nov 14, 2021 · 10 comments · May be fixed by #81081
Labels
>bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination.

Comments

@howardhuanghua
Copy link
Contributor

howardhuanghua commented Nov 14, 2021

Elasticsearch version : 7.10

JVM version (java -version): JDK 11

Description of the problem including expected versus actual behavior:
In #77991 we solved asnyc shard fetch resposes memory consumption issue.
But we found async shard fetch reqeusts also consume lots of heap memories. Here is our production env for this exception case:
Data nodes number: 75
Dedicate master nodes number: 3
Master node resource: 2 Core cpus, 8GB physical memory, 4GB heap memory.
Total shards number: 15000

When the new master has been elected after full cluster restart, the elected master heap memory would be used up for several seconds. We dump the memory and found netty inflight sending request used lots of heap:
image

Each WriteOperation should be single shard request to specific node (16k buffer size per each):
企业微信截图_16367325062007

From Netty4MessageChannelHandler class we could see a queuedWrites, messages are flushed asynchronously:

private final Queue<WriteOperation> queuedWrites = new ArrayDeque<>();

So besides cutting fetch shard response, we also need to handle massive shard sending requests.

@howardhuanghua howardhuanghua added >bug needs:triage Requires assignment of a team area label labels Nov 14, 2021
@howardhuanghua howardhuanghua changed the title Massive async shard fetch requests cost lots of heap memories on master node. Massive async shard fetch requests consume lots of heap memories on master node. Nov 14, 2021
@DaveCTurner
Copy link
Contributor

Relates #77466

@DaveCTurner DaveCTurner added the :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) label Nov 14, 2021
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Nov 14, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner DaveCTurner removed Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. needs:triage Requires assignment of a team area label labels Nov 14, 2021
@DaveCTurner
Copy link
Contributor

I can see that this might be a problem: 15k shards × 75 nodes × 16kiB buffer for each message ≈ 17GiB of memory churn when the cluster state recovers.

@howardhuanghua
Copy link
Contributor Author

That's it. Finally we scale up heap memory to 16GB and cluster got recovered.

@howardhuanghua
Copy link
Contributor Author

The main problem is that we fetch each shard from all the data nodes, actually only specific single data node contains the right shard copy. Meanwhile, we don't need fetch all the shards at the same in the begining , as we have concurrent allocation throttle. If we restart single data node, we could capture the unassigned shard target node info in disassociating left node event. Then during shard recovery, we could fetch shard from single target node, or the nodes contain this shard's primary + replicas. This is what we have already done in our local test env. But if in full cluster restart case, we haven't stored any shard routing info during cluster state persistent, we don't know which specific node the unassigned shard that was allocated previously. We could discuss more details, it’s our pleasure to optimize this part if possible.

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Nov 15, 2021

I suspect the problem is primary allocation (i.e. internal:gateway/local/started_shards) rather than replica allocation (i.e. internal:cluster/nodes/indices/shard/store). Replica allocation messages are naturally throttled because they happen as the primaries come online.

I have a couple of ideas for addressing this:

  1. We could batch the requests up so that there's only a limited number of requests in flight at once. Maybe just one-at-a-time. Rather than sending each request right away if there's already one in flight then we could just add them to the next batch to be sent.

  2. We could keep track of which shards are definitely not on each node and skip sending requests to nodes that don't have anything to offer.

I prefer the first idea I think. The second is effectively a cache which we could populate at cluster startup fairly easily but it would be tricky to keep its contents correct as the cluster is running (the refresh or invalidation logic is pretty complicated). Batching has the disadvantage that a shard on a slow/broken disk will hold up replies about other shards but I don't think that's a big deal in practice since a slow/broken disk on a node causes other issues. I think batching would scale better in even larger clusters too: if we stick to the one-message-per-shard model then a fairly modest shard count (~100k or so) will always need GBs of memory just for the allocation messages.

@DaveCTurner
Copy link
Contributor

I won't have time to work on this myself in the near future, but if you would like to work on this then please do.

@howardhuanghua
Copy link
Contributor Author

@DaveCTurner Thanks for the suggestion. Just to confirm details about batching mode, one of the idea is that group all the unassigned primary shards together, send to each node, that means each node only has single fetch request. Otherwise we still need to prepare single request per shard for each node?

@DaveCTurner
Copy link
Contributor

Yes that's about right. It won't be all the unassigned shards, ideally we'd keep the same overall behaviour as today, just using fewer transport messages.

Having looked at this a bit more I can see value in adding batching to the replica allocator too. There are cases (e.g. a big network partition) where we'd send a large number of internal:cluster/nodes/indices/shard/store messages today.

@howardhuanghua
Copy link
Contributor Author

howardhuanghua commented Nov 27, 2021

Hi @DaveCTurner , I quick implemented a draft #81081 to batch the async shard fetching request for primary first.
Please help to check it's the right direction, the main logic is in InternalPrimaryShardAllocator, there are still some works need to be done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants