Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant response of empty result in AsyncShardFetch to avoid OOM issue #84010

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/84010.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 84010
summary: Remove redundant response of empty result in AsyncShardFetch
area: Distributed
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private class InternalAsyncFetch extends AsyncShardFetch<NodeGatewayStartedShard
String customDataPath,
Lister<? extends BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> action
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, x -> true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Collections.emptySet;
Expand Down Expand Up @@ -65,20 +66,24 @@ public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, N
private final Set<String> nodesToIgnore = new HashSet<>();
private final AtomicLong round = new AtomicLong();
private boolean closed;
private Predicate<T> validResultPredicate;
private Set<DiscoveryNode> completeNodes = new HashSet<>();

@SuppressWarnings("unchecked")
protected AsyncShardFetch(
Logger logger,
String type,
ShardId shardId,
String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action
Lister<? extends BaseNodesResponse<T>, T> action,
Predicate<T> validResultPredicate
) {
this.logger = logger;
this.type = type;
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
this.action = (Lister<BaseNodesResponse<T>, T>) action;
this.validResultPredicate = validResultPredicate;
}

@Override
Expand Down Expand Up @@ -206,6 +211,10 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
// if the entry is there, for the right fetching round and not marked as failed already, process it
logger.trace("{} marking {} as done for [{}], result is [{}]", shardId, nodeEntry.getNodeId(), type, response);
nodeEntry.doneFetching(response);
completeNodes.add(response.getNode());
if (!validResultPredicate.test(response)) {
cache.remove(response.getNode().getId());
}
}
}
}
Expand Down Expand Up @@ -262,6 +271,7 @@ protected synchronized void processAsyncFetch(List<T> responses, List<FailedNode
*/
synchronized void clearCacheForNode(String nodeId) {
cache.remove(nodeId);
completeNodes.remove(nodeId);
}

/**
Expand All @@ -272,12 +282,13 @@ private void fillShardCacheWithDataNodes(Map<String, NodeEntry<T>> shardCache, D
// verify that all current data nodes are there
for (Map.Entry<String, DiscoveryNode> cursor : nodes.getDataNodes().entrySet()) {
DiscoveryNode node = cursor.getValue();
if (shardCache.containsKey(node.getId()) == false) {
if (completeNodes.contains(node.getId()) == false && shardCache.containsKey(node.getId()) == false) {
shardCache.put(node.getId(), new NodeEntry<T>(node.getId()));
}
}
// remove nodes that are not longer part of the data nodes set
shardCache.keySet().removeIf(nodeId -> nodes.nodeExists(nodeId) == false);
completeNodes.removeIf(node -> !nodes.nodeExists(node.getId()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;

public class GatewayAllocator implements ExistingShardsAllocator {
Expand Down Expand Up @@ -217,9 +218,10 @@ class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T>
String type,
ShardId shardId,
String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action
Lister<? extends BaseNodesResponse<T>, T> action,
Predicate<T> validResultPredicate
) {
super(logger, type, shardId, customDataPath, action);
super(logger, type, shardId, customDataPath, action, validResultPredicate);
}

@Override
Expand Down Expand Up @@ -256,7 +258,8 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
"shard_started",
shardId,
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
lister
lister,
result -> result.allocationId() != null
)
);
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState = fetch.fetchData(
Expand Down Expand Up @@ -304,7 +307,8 @@ protected AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> fetchData(ShardRou
"shard_store",
shard.shardId(),
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
lister
lister,
result -> !result.storeFilesMetadata().isEmpty()
)
);
AsyncShardFetch.FetchResult<NodeStoreFilesMetadata> shardStores = fetch.fetchData(
Expand Down