Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch async fetch shards data to reduce memory consumption. #81081

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

howardhuanghua
Copy link
Contributor

@howardhuanghua howardhuanghua commented Nov 27, 2021

This commit is going to fix #80694.

  1. Queue shard async fetch requests and listeners.
  2. Flush all the queued reqeusts after we collected all the node level async fetch reqeusts.
  3. Split fetch responses to single node-to-shard and call the cached listeners after receiving node level fetch reqeusts.

Async shard fetch requests before/after optimization:
image

image

image

@elasticsearchmachine elasticsearchmachine added v8.1.0 external-contributor Pull request authored by a developer outside the Elasticsearch team labels Nov 27, 2021
@howardhuanghua howardhuanghua marked this pull request as ready for review November 28, 2021 01:52
@howardhuanghua howardhuanghua changed the title Group primary shard async fetch requests by node to reduce memory consumption. [Draft]Group primary shard async fetch requests by node to reduce memory consumption. Nov 28, 2021
Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the right sort of idea. I left some comments inline. I think we should also do this for replica allocations too.

@@ -56,6 +56,7 @@
*/
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(ShardId shardId, @Nullable String customDataPath, DiscoveryNode[] nodes, ActionListener<NodesResponse> listener);
void flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than introducing this method to the lister (and the corresponding flag passed in to fetchData) could we have the allocator directly indicate the end of an allocation round which triggers the flush.

for (ShardId shardId : requestMap.keySet()) {
ShardRequestInfo shardRequest = requestMap.get(shardId);
shards.put(shardRequest.shardId(), shardRequest.getCustomDataPath());
if (node.getVersion().before(Version.V_7_16_0)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version in master is now 8.1.0; it's unlikely we'll backport this to an earlier version.

};

client.executeLocally(
TransportNodesListGatewayStartedShards.TYPE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm undecided about re-using the same action type for both kinds of request here. I think it'd be cleaner to introduce a new one (and to name it something better than internal:gateway/local/started_shards) given how big a difference in behaviour we are making.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @DaveCTurner , if we introduce a new action, then we need to refactor some logics in GatewayAllocator, like the follow structures, it seems that would a big change for the high level allocators. How do you think so?

private final ConcurrentMap<ShardId, AsyncShardFetch<NodeGatewayStartedShards>> asyncFetchStarted = ConcurrentCollections
.newConcurrentMap();
private final ConcurrentMap<ShardId, AsyncShardFetch<NodeStoreFilesMetadata>> asyncFetchStore = ConcurrentCollections
.newConcurrentMap();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is true, I think we could keep pretty much the same interface from the point of view of GatewayAllocator. It should be possible to implement a batching Lister which reworks the batched responses into a BaseNodesResponse<NodeGatewayStartedShards>.

protected NodeGatewayStartedShards nodeOperation(NodeRequest request, Task task) {
protected NodeGroupedGatewayStartedShards nodeOperation(NodeRequest request, Task task) {
NodeGroupedGatewayStartedShards groupedStartedShards = new NodeGroupedGatewayStartedShards(clusterService.localNode());
for (Map.Entry<ShardId, String> entry : request.getShards().entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When sending these requests per-shard we execute them in parallel across the FETCH_SHARD_STARTED threadpool. I think we should continue to parallelise them at the shard level like that.

@howardhuanghua
Copy link
Contributor Author

Thanks for the suggestion. I am going to complete the optimization.

@pgomulka pgomulka added the :Search/Search Search-related issues that do not fall into other categories label Nov 30, 2021
@elasticmachine elasticmachine added the Team:Search Meta label for search team label Nov 30, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-search (Team:Search)

@ywelsch ywelsch added :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) and removed :Search/Search Search-related issues that do not fall into other categories labels Nov 30, 2021
@elasticmachine elasticmachine added Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. and removed Team:Search Meta label for search team labels Nov 30, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@howardhuanghua howardhuanghua changed the title [Draft]Group primary shard async fetch requests by node to reduce memory consumption. Group primary shard async fetch requests by node to reduce memory consumption. Dec 7, 2021
@howardhuanghua howardhuanghua changed the title Group primary shard async fetch requests by node to reduce memory consumption. Batch async fetch shards data to reduce memory consumption. Dec 7, 2021
@howardhuanghua
Copy link
Contributor Author

it'd help if we could break this down into several smaller (and easier-to-review) steps somehow.

Thanks. I would try to break down into smaller steps. Any suggestion would be appreciated.

@DaveCTurner
Copy link
Contributor

Thanks. I would try to break down into smaller steps. Any suggestion would be appreciated.

IMO the trickiest part is that today there's no well-defined "end" to the shard-by-shard fetching process, and there needs to be an end so that we know when to flush the last batch of requests. This PR adds this event (and a lot of other things besides). I think making that change in isolation would be worth trying as a first step. This change won't mean very much on its own because we won't actually be doing any flushing at the end, but still once we have this flushing event I believe it'll be easier to review the changes that adds batching on top of it.

Independently, I think you could detach TransportIndicesShardStoresAction from the InternalAsyncFetch mechanism. This action doesn't do any caching (and is hardly used anyway) so there's no need to involve it in these changes.

@howardhuanghua
Copy link
Contributor Author

Hi @DaveCTurner , in the follow logic of above PR, we first iterate all the unassigned primaries, each allocator.allocateUnassigned call would add single pending fetching shard, and after the while loop, we would call fush again to handle the rest of pending shards, this is the "end" that you mentioned above?

private void allocateExistingUnassignedShards(RoutingAllocation allocation) {
        allocation.routingNodes().unassigned().sort(PriorityComparator.getAllocationComparator(allocation)); // sort for priority ordering

        for (final ExistingShardsAllocator existingShardsAllocator : existingShardsAllocators.values()) {
            existingShardsAllocator.beforeAllocation(allocation);
        }

        GatewayAllocator gatewayAllocator = null;
        if (logger.isDebugEnabled()) {
            logger.debug("set batch fetch mode [{}] for routing allocation.", batchFetchShardEnable);
        }
        allocation.setBatchShardFetchMode(batchFetchShardEnable);

        final RoutingNodes.UnassignedShards.UnassignedIterator primaryIterator = allocation.routingNodes().unassigned().iterator();
        while (primaryIterator.hasNext()) {
            final ShardRouting shardRouting = primaryIterator.next();
            if (shardRouting.primary()) {
                ExistingShardsAllocator allocator = getAllocatorForShard(shardRouting, allocation);
                allocator.allocateUnassigned(shardRouting, allocation, primaryIterator);
                if (gatewayAllocator == null && allocator instanceof GatewayAllocator) {
                    gatewayAllocator = (GatewayAllocator) allocator;
                }

                if (gatewayAllocator != null
                    && gatewayAllocator.getPrimaryPendingFetchShardCount() > 0
                    && gatewayAllocator.getPrimaryPendingFetchShardCount() % batchFetchShardStepSize == 0) {
                    gatewayAllocator.flushPendingPrimaryFetchRequests(batchFetchShardStepSize);
                }
            }
        }

        // flush the rest primaries
        if (gatewayAllocator != null) {
            gatewayAllocator.flushPendingPrimaryFetchRequests(batchFetchShardStepSize);
        }

Original code:
https://github.com/TencentCloudES/elasticsearch/blob/e415426c511088ca5b0d4e86b205a6ab2b025648/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java#L592-L595

DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Mar 14, 2023
- No need to use an `AsyncShardFetch` here, there is no caching
- Response may be very large, introduce chunking
- Fan-out may be very large, introduce throttling
- Processing time may be nontrivial, introduce cancellability
- Eliminate many unnecessary intermediate data structures
- Do shard-level response processing more eagerly
- Determine allocation from `RoutingTable` not `RoutingNodes`
- Add tests

Relates elastic#81081
elasticsearchmachine pushed a commit that referenced this pull request Mar 27, 2023
- No need to use an `AsyncShardFetch` here, there is no caching
- Response may be very large, introduce chunking
- Fan-out may be very large, introduce throttling
- Processing time may be nontrivial, introduce cancellability
- Eliminate many unnecessary intermediate data structures
- Do shard-level response processing more eagerly
- Determine allocation from `RoutingTable` not `RoutingNodes`
- Add tests

Relates #81081
@gmarouli gmarouli added v8.9.0 and removed v8.8.0 labels Apr 26, 2023
@quux00 quux00 added v8.11.0 and removed v8.10.0 labels Aug 16, 2023
@mattc58 mattc58 added v8.12.0 and removed v8.11.0 labels Oct 4, 2023
@elasticsearchmachine elasticsearchmachine added v9.1.0 Team:Distributed Coordination Meta label for Distributed Coordination team and removed v9.0.0 labels Jan 30, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-obsolete (Team:Distributed (Obsolete))

@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
>bug :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) external-contributor Pull request authored by a developer outside the Elasticsearch team Team:Distributed Coordination Meta label for Distributed Coordination team Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Massive async shard fetch requests consume lots of heap memories on master node.