diff --git a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java index 0d9272d0d5689..994631d4bf026 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncBatchShardFetch.java @@ -37,7 +37,6 @@ import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; public abstract class AsyncBatchShardFetch implements Releasable { @@ -46,13 +45,13 @@ public abstract class AsyncBatchShardFetch implement * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(DiscoveryNode[] nodes,Map shardIdsWithCustomDataPath, ActionListener listener); + void list(DiscoveryNode[] nodes, Map shardIdsWithCustomDataPath, ActionListener listener); } protected final Logger logger; protected final String type; private final String batchUUID; - protected Map shardsToCustomDataPathMap; + protected Map shardsToCustomDataPathMap; private Map> ignoredShardToNodes = new HashMap<>(); private final AsyncBatchShardFetch.Lister, T> action; private final Map> cache = new HashMap<>(); @@ -362,7 +361,7 @@ public Map getData() { * Process any changes needed to the allocation based on this fetch result. */ public void processAllocation(RoutingAllocation allocation) { - for(Map.Entry> entry : ignoredShardToNodes.entrySet()) { + for (Map.Entry> entry : ignoredShardToNodes.entrySet()) { ShardId shardId = entry.getKey(); Set ignoreNodes = entry.getValue(); if (ignoreNodes.isEmpty() == false) { diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 6dc7011a31f3b..794aacb0c2fd1 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -83,7 +83,7 @@ public interface Lister, N protected final Logger logger; protected final String type; - protected final Map shardToCustomDataPath; + protected final Map shardToCustomDataPath; private final Lister, T> action; private final Map> cache = new HashMap<>(); private final AtomicLong round = new AtomicLong(); @@ -103,7 +103,7 @@ protected AsyncShardFetch( ) { this.logger = logger; this.type = type; - shardToCustomDataPath =new HashMap<>(); + shardToCustomDataPath = new HashMap<>(); shardToCustomDataPath.put(shardId, customDataPath); this.action = (Lister, T>) action; this.logKey = "ShardId=[" + shardId.toString() + "]"; @@ -122,11 +122,10 @@ protected AsyncShardFetch( this.type = type; this.shardToCustomDataPath = shardToCustomDataPath; this.action = (Lister, T>) action; - this.logKey = "BatchID=[" + batchId+ "]"; + this.logKey = "BatchID=[" + batchId + "]"; enableBatchMode = true; } - @Override public synchronized void close() { this.closed = true; @@ -157,11 +156,12 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map fetchData(DiscoveryNodes nodes, Map ignoreNodeSet.isEmpty() == false)) { - reroute(logKey, "nodes failed [" + failedNodes.size() + "], ignored [" - + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + "]"); + if (failedNodes.isEmpty() == false + || allIgnoreNodesMap.values().stream().anyMatch(ignoreNodeSet -> ignoreNodeSet.isEmpty() == false)) { + reroute( + logKey, + "nodes failed [" + + failedNodes.size() + + "], ignored [" + + allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum() + + "]" + ); } return new FetchResult<>(fetchData, allIgnoreNodesMap); @@ -398,13 +405,13 @@ public void onFailure(Exception e) { */ public static class FetchResult { - private final Map data; - private final Map> ignoredShardToNodes; + private final Map data; + private final Map> ignoredShardToNodes; - public FetchResult(Map data, Map> ignoreNodes) { - this.data = data; - this.ignoredShardToNodes = ignoreNodes; - } + public FetchResult(Map data, Map> ignoreNodes) { + this.data = data; + this.ignoredShardToNodes = ignoreNodes; + } /** * Does the result actually contain data? If not, then there are on going fetch @@ -423,20 +430,20 @@ public Map getData() { return this.data; } - /** - * Process any changes needed to the allocation based on this fetch result. - */ - public void processAllocation(RoutingAllocation allocation) { - for(Map.Entry> entry : ignoredShardToNodes.entrySet()) { - ShardId shardId = entry.getKey(); - Set ignoreNodes = entry.getValue(); - if (ignoreNodes.isEmpty() == false) { - ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); + /** + * Process any changes needed to the allocation based on this fetch result. + */ + public void processAllocation(RoutingAllocation allocation) { + for (Map.Entry> entry : ignoredShardToNodes.entrySet()) { + ShardId shardId = entry.getKey(); + Set ignoreNodes = entry.getValue(); + if (ignoreNodes.isEmpty() == false) { + ignoreNodes.forEach(nodeId -> allocation.addIgnoreShardForNode(shardId, nodeId)); + } } - } + } } -} /** * A node entry, holding the state of the fetched data for a specific shard diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 81a205f5a9235..e74cb375a3bc4 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -270,6 +270,7 @@ protected void reroute(String logKey, String reason) { ); } } + class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final TransportNodesListGatewayStartedShards startedAction; @@ -295,9 +296,11 @@ protected AsyncShardFetch.FetchResult shardState = fetch.fetchData( allocation.nodes(), - new HashMap<>() {{ - put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); - }} + new HashMap<>() { + { + put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); + } + } ); if (shardState.hasData()) { @@ -332,9 +335,11 @@ protected AsyncShardFetch.FetchResult shardStores = fetch.fetchData( allocation.nodes(), - new HashMap<>() {{ - put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); - }} + new HashMap<>() { + { + put(shard.shardId(), allocation.getIgnoreNodes(shard.shardId())); + } + } ); if (shardStores.hasData()) { shardStores.processAllocation(allocation); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java index 61ae5e62b8a83..0abbb497039d9 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShards.java @@ -125,7 +125,11 @@ public TransportNodesListGatewayStartedShards( } @Override - public void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener) { + public void list( + Map shardIdsWithCustomDataPath, + DiscoveryNode[] nodes, + ActionListener listener + ) { assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); final String customDataPath = shardIdsWithCustomDataPath.get(shardId); diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java index e2447a1c5612c..32e792169a811 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -126,8 +126,12 @@ public TransportNodesListShardStoreMetadata( } @Override - public void list(Map shardIdsWithCustomDataPath, DiscoveryNode[] nodes, ActionListener listener) { - assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; + public void list( + Map shardIdsWithCustomDataPath, + DiscoveryNode[] nodes, + ActionListener listener + ) { + assert shardIdsWithCustomDataPath.size() == 1 : "only one shard should be specified"; final ShardId shardId = shardIdsWithCustomDataPath.keySet().iterator().next(); final String customDataPath = shardIdsWithCustomDataPath.get(shardId); execute(new Request(shardId, customDataPath, nodes), listener); diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java index c1da9575d4a83..21c16eb3cbc6d 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardAllocatorTests.java @@ -850,9 +850,11 @@ protected AsyncShardFetch.FetchResult( data, new HashMap<>(){{ - put(shardId, Collections.emptySet()); - }}); + return new AsyncShardFetch.FetchResult<>(data, new HashMap<>() { + { + put(shardId, Collections.emptySet()); + } + }); } } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java index f32f541f5ba4e..dc28315465c02 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardAllocatorTests.java @@ -727,9 +727,11 @@ protected AsyncShardFetch.FetchResult(tData, new HashMap<>(){{ - put(shardId, Collections.emptySet()); - }}); + return new AsyncShardFetch.FetchResult<>(tData, new HashMap<>() { + { + put(shardId, Collections.emptySet()); + } + }); } @Override