Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve reroute with many shards #48579

Closed
wants to merge 1 commit into from

Conversation

xiahongju
Copy link
Contributor

Background: Last week, a customer of Alibaba Cloud Elasticsearch complained to us that it took 1 minute to create an index after the cluster was migrated from version 6.3.2 to 7.4.0. The customer's current cluster has 10 data nodes, more than 50,000 shards, using index lifecycle management, and when the index moves to the cold node, it is closed.

Testing: We built a test environment to reproduce the problem
Elasticsearch version: 7.4.0
Dedicated master node: 3 * 16core64GB
Data node: 2 * 16core64GB

First create 5000 indexes, each index has 5 primaries, 0 replicas, a total of 25,000 shards, the test found that each time creating a new index requires 58s. By analyzing the hot threads of the master when creating an index, we found that the problem was introduced by Add support for replicating closed indices (#39499), starting with 7.2.0, The shards of closed indices are still reinitialized and reallocated on data nodes。

Analysis: BalancedShardsAllocator traverses all started shards of the cluster during reroute, and calculates size of all shards that are currently being relocated to the node where the shard is located. This requires finding all shards that are in INITIALIZING and being relocated. The current implementation is to traverse all the shards of the node, which is very time consuming. When there are a lot of shards, almost any requests that need reroute action will encounter this problem in all versions of the Elasticsearch.

Solution: Considering that there are not many shards in the cluster in INITIALIZING and RELOCATING, we can cache the shards of the corresponding state without having to calculate it every time. After the optimization of the above test cluster, the time of creating index and closing index will be reduced from 58s to 1.2s.

Although we may circumvent this problem by setting cluster.routing.allocation.disk.include_relocations to false, But there are some drawbacks.

  1. Causes disk usage to be estimated, resulting in more shard to be relocated
  2. When there are a large number of unsigned shards that need to be allocated, such as a full cluster restart, the canAllocate of ThrottlingAllocationDecider needs to find shards of INITIALIZING, which is also the bottleneck caused by the same reason as above.

Java stack traces of masterService thread

"elasticsearch[iZ2ze1ymtwjqspsn3jco0tZ][masterService#updateTask][T#1]" #39 daemon prio=5 os_prio=0 cpu=150732651.74ms elapsed=258053.43s tid=0x00007f7c98012000 nid=0x3006 runnable [0x00007f7ca28f8000]
java.lang.Thread.State: RUNNABLE
at java.util.Collections$UnmodifiableCollection$1.hasNext(java.base@13/Collections.java:1046)
at org.elasticsearch.cluster.routing.RoutingNode.shardsWithState(RoutingNode.java:148)
at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.sizeOfRelocatingShards(DiskThresholdDecider.java:111)
at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.getDiskUsage(DiskThresholdDecider.java:345)
at org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider.canRemain(DiskThresholdDecider.java:290)
at org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders.canRemain(AllocationDeciders.java:108)
at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.decideMove(BalancedShardsAllocator.java:668)
at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator$Balancer.moveShards(BalancedShardsAllocator.java:628)
at org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.allocate(BalancedShardsAllocator.java:123)
at org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:405)
at org.elasticsearch.cluster.routing.allocation.AllocationService.reroute(AllocationService.java:370)
at org.elasticsearch.cluster.metadata.MetaDataIndexStateService$1$1.execute(MetaDataIndexStateService.java:168)
at org.elasticsearch.cluster.ClusterStateUpdateTask.execute(ClusterStateUpdateTask.java:47)
at org.elasticsearch.cluster.service.MasterService.executeTasks(MasterService.java:702)
at org.elasticsearch.cluster.service.MasterService.calculateTaskOutputs(MasterService.java:324)
at org.elasticsearch.cluster.service.MasterService.runTasks(MasterService.java:219)
at org.elasticsearch.cluster.service.MasterService.access$000(MasterService.java:73)
at org.elasticsearch.cluster.service.MasterService$Batcher.run(MasterService.java:151)
at org.elasticsearch.cluster.service.TaskBatcher.runIfNotProcessed(TaskBatcher.java:150)
at org.elasticsearch.cluster.service.TaskBatcher$BatchedTask.run(TaskBatcher.java:188)
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:703)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedEsThreadPoolExecutor.java:252)
at org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedEsThreadPoolExecutor.java:215)
at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@13/ThreadPoolExecutor.java:1128)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@13/ThreadPoolExecutor.java:628)
at java.lang.Thread.run(java.base@13/Thread.java:830)

@elasticcla
Copy link

Hi @xiahongju, we have found your signature in our records, but it seems like you have signed with a different e-mail than the one used in your Git commit. Can you please add both of these e-mails into your Github profile (they can be hidden), so we can match your e-mails to your Github profile?

@DaveCTurner
Copy link
Contributor

Thanks for the contribution @xiahongju. Your analysis seems sound, but this work is already under way in #47817. There is no need to track the shards that are neither INITIALIZING nor RELOCATING like this, but the change to ThrottlingAllocationDecider looks good and I will try and carry that over.

The customer's current cluster has 10 data nodes, more than 50,000 shards, using index lifecycle management, and when the index moves to the cold node, it is closed.

This sounds to be outside the bounds of how a cluster should be configured. For instance, note these recommendations for shard sizing suggesting that they should have fewer, larger shards. I think they should also either be freezing the older indices (if they want them to remain searchable) or removing them entirely from the cluster (if they do not need to be searchable). They can be restored from snapshots if needed.

@DaveCTurner
Copy link
Contributor

DaveCTurner commented Oct 29, 2019

@xiahongju could you sign the CLA with the email address you used for this PR, "hongju.xhj" <[email protected]>? Then I can add you as a co-author of #47817 to give you credit for the change to ThrottlingAllocationDecider.

@DaveCTurner DaveCTurner reopened this Oct 29, 2019
@DaveCTurner DaveCTurner added the :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) label Oct 29, 2019
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (:Distributed/Allocation)

@DaveCTurner
Copy link
Contributor

Ah it looks like you already did, I couldn't tell that when the PR was closed. No action required here, thanks.

@xiahongju
Copy link
Contributor Author

Thank you @DaveCTurner, I am very happy and willing to be a co-author and contribute to the community. Yes you’re right, Tracking the shards besides INITIALIZING and RELOCATING does bring some memory overhead, However

  1. The objects stored in the Hashset is a shallow copy, the overhead is small, and it is worthwhile to sacrifice some space for better readability and maintainability. we may even provide only shardsWithInitalizingState and shardsWithRelocatingState, and other states are handled by iterator, which is almost always called by the test code. But this is neither abstract nor easy to use.
  2. Considering that shardsWithState can be called by passing with other states to get shards, this commit is more extensible and stable.
  3. The appropriate abstraction of states and shardRoutings makes the implementation more elegant

DaveCTurner pushed a commit that referenced this pull request Oct 31, 2019
Today a couple of allocation deciders iterate through all the shards on a node
to find the `INITIALIZING` or `RELOCATING` ones, and this can slow down cluster
state updates in clusters with very high-density nodes holding many thousands
of shards even if those shards belong to closed or frozen indices. This commit
pre-computes the sets of `INITIALIZING` and `RELOCATING` shards to speed up
this search.

Closes #46941
Relates #48579

Co-authored-by: "hongju.xhj" <[email protected]>
DaveCTurner pushed a commit that referenced this pull request Oct 31, 2019
Today a couple of allocation deciders iterate through all the shards on a node
to find the `INITIALIZING` or `RELOCATING` ones, and this can slow down cluster
state updates in clusters with very high-density nodes holding many thousands
of shards even if those shards belong to closed or frozen indices. This commit
pre-computes the sets of `INITIALIZING` and `RELOCATING` shards to speed up
this search.

Closes #46941
Relates #48579

Co-authored-by: "hongju.xhj" <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants