diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 26714f9a65329..ba03532a9aa2f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -56,7 +56,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; -import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; import org.opensearch.index.MergePolicyProvider; @@ -818,9 +818,9 @@ public void testShardFetchCorruptedShardsUsingBatchAction() throws Exception { .get(discoveryNodes[0].getId()) .getNodeGatewayStartedShardsBatch() .get(shardId); - assertNotNull(gatewayStartedShard.get().storeException()); - assertNotNull(gatewayStartedShard.get().allocationId()); - assertTrue(gatewayStartedShard.get().primary()); + assertNotNull(gatewayStartedShard.storeException()); + assertNotNull(gatewayStartedShard.allocationId()); + assertTrue(gatewayStartedShard.primary()); } public void testSingleShardStoreFetchUsingBatchAction() throws ExecutionException, InterruptedException { @@ -949,9 +949,9 @@ private void assertNodeStoreFilesMetadataSuccessCase( } private void assertNodeGatewayStartedShardsHappyCase(GatewayStartedShard gatewayStartedShard) { - assertNull(gatewayStartedShard.get().storeException()); - assertNotNull(gatewayStartedShard.get().allocationId()); - assertTrue(gatewayStartedShard.get().primary()); + assertNull(gatewayStartedShard.storeException()); + assertNotNull(gatewayStartedShard.allocationId()); + assertTrue(gatewayStartedShard.primary()); } private void prepareIndex(String indexName, int numberOfPrimaryShards) { diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java index 18133f9b4bd1a..dbc1ee8d40832 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardBatchFetch.java @@ -12,22 +12,16 @@ import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.store.ShardAttributes; import java.lang.reflect.Array; -import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.BiFunction; -import java.util.function.Consumer; import java.util.function.Function; -import java.util.function.Supplier; /** * Implementation of AsyncShardFetch with batching support. This class is responsible for executing the fetch @@ -43,9 +37,6 @@ */ public abstract class AsyncShardBatchFetch extends AsyncShardFetch { - private final Consumer removeShardFromBatch; - private final List failedShards; - @SuppressWarnings("unchecked") AsyncShardBatchFetch( Logger logger, @@ -56,57 +47,29 @@ public abstract class AsyncShardBatchFetch extend Class clazz, BiFunction, T> responseGetter, Function> shardsBatchDataGetter, - Supplier emptyResponseBuilder, - Consumer failedShardHandler, - Function getResponseException, + V emptyResponse, Function isEmptyResponse ) { - super(logger, type, shardAttributesMap, action, batchId); - this.removeShardFromBatch = failedShardHandler; - this.failedShards = new ArrayList<>(); - this.cache = new ShardBatchCache<>( + super( logger, type, shardAttributesMap, - "BatchID=[" + batchId + "]", - clazz, - responseGetter, - shardsBatchDataGetter, - emptyResponseBuilder, - this::cleanUpFailedShards, - getResponseException, - isEmptyResponse + action, + batchId, + new ShardBatchCache<>( + logger, + type, + shardAttributesMap, + "BatchID=[" + batchId + "]", + clazz, + responseGetter, + shardsBatchDataGetter, + emptyResponse, + isEmptyResponse + ) ); } - public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> ignoreNodes) { - FetchResult result = super.fetchData(nodes, ignoreNodes); - if (result.hasData()) { - // trigger reroute for failed shards only when all nodes have completed fetching - if (failedShards.isEmpty() == false) { - // trigger a reroute if there are any shards failed, to make sure they're picked up in next run - logger.trace("triggering another reroute for failed shards in {}", reroutingKey); - reroute("shards-failed", "shards failed in " + reroutingKey); - failedShards.clear(); - } - } - return result; - } - - /** - * Remove the shard from shardAttributesMap so it's not sent in next asyncFetch. - * Call removeShardFromBatch method to remove the shardId from the batch object created in - * ShardsBatchGatewayAllocator. - * Add shardId to failedShards, so it can be used to trigger another reroute as part of upcoming fetchData call. - * - * @param shardId shardId to be cleaned up from batch and cache. - */ - private void cleanUpFailedShards(ShardId shardId) { - shardAttributesMap.remove(shardId); - removeShardFromBatch.accept(shardId); - failedShards.add(shardId); - } - /** * Remove a shard from the cache maintaining a full batch of shards. This is needed to clear the shard once it's * assigned or failed. @@ -119,7 +82,10 @@ public void clearShard(ShardId shardId) { } /** - * Cache implementation of transport actions returning batch of shards related data in the response. It'll + * Cache implementation of transport actions returning batch of shards related data in the response. + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShardsBatch} or + * {@link org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch} with memory efficient caching + * approach. * * @param Response type of transport action. * @param Data type of shard level response. @@ -130,11 +96,8 @@ static class ShardBatchCache extends AsyncShardFe private final int batchSize; private final Class shardResponseClass; private final BiFunction, T> responseConstructor; - private final Map arrayToShardId; private final Function> shardsBatchDataGetter; - private final Supplier emptyResponseBuilder; - private final Consumer failedShardHandler; - private final Function getException; + private final V emptyResponse; private final Function isEmpty; private final Logger logger; @@ -146,24 +109,19 @@ public ShardBatchCache( Class clazz, BiFunction, T> responseGetter, Function> shardsBatchDataGetter, - Supplier emptyResponseBuilder, - Consumer failedShardHandler, - Function getResponseException, + V emptyResponse, Function isEmptyResponse ) { super(Loggers.getLogger(logger, "_" + logKey), type); this.batchSize = shardAttributesMap.size(); - this.getException = getResponseException; this.isEmpty = isEmptyResponse; cache = new HashMap<>(); shardIdToArray = new HashMap<>(); - arrayToShardId = new HashMap<>(); fillShardIdKeys(shardAttributesMap.keySet()); this.shardResponseClass = clazz; this.responseConstructor = responseGetter; this.shardsBatchDataGetter = shardsBatchDataGetter; - this.emptyResponseBuilder = emptyResponseBuilder; - this.failedShardHandler = failedShardHandler; + this.emptyResponse = emptyResponse; this.logger = logger; } @@ -182,26 +140,9 @@ public void deleteShard(ShardId shardId) { } } - @Override - public Map getCacheData(DiscoveryNodes nodes, Set failedNodes) { - fillReverseIdMap(); - return super.getCacheData(nodes, failedNodes); - } - - /** - * Build a reverse map to get shardId from the array index, this will be used to construct the response which - * PrimaryShardBatchAllocator or ReplicaShardBatchAllocator are looking for. - */ - private void fillReverseIdMap() { - arrayToShardId.clear(); - for (Map.Entry indexMapping : shardIdToArray.entrySet()) { - arrayToShardId.putIfAbsent(indexMapping.getValue(), indexMapping.getKey()); - } - } - @Override public void initData(DiscoveryNode node) { - cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, getException, isEmpty)); + cache.put(node.getId(), new NodeEntry<>(node.getId(), shardResponseClass, batchSize, isEmpty)); } /** @@ -216,40 +157,9 @@ public void initData(DiscoveryNode node) { public void putData(DiscoveryNode node, T response) { NodeEntry nodeEntry = cache.get(node.getId()); Map batchResponse = shardsBatchDataGetter.apply(response); - filterFailedShards(batchResponse); nodeEntry.doneFetching(batchResponse, shardIdToArray); } - /** - * Return the shard for which we got unhandled exceptions. - * - * @param batchResponse response from one node for the batch. - */ - private void filterFailedShards(Map batchResponse) { - logger.trace("filtering failed shards"); - for (Iterator it = batchResponse.keySet().iterator(); it.hasNext();) { - ShardId shardId = it.next(); - if (batchResponse.get(shardId) != null) { - if (getException.apply(batchResponse.get(shardId)) != null) { - // handle per shard level exceptions, process other shards, only throw out this shard from - // the batch - Exception shardException = getException.apply(batchResponse.get(shardId)); - // if the request got rejected or timed out, we need to try it again next time... - if (retryableException(shardException)) { - logger.trace( - "got unhandled retryable exception for shard {} {}", - shardId.toString(), - shardException.toString() - ); - failedShardHandler.accept(shardId); - // remove this failed entry. So, while storing the data, we don't need to re-process it. - it.remove(); - } - } - } - } - } - @Override public T getData(DiscoveryNode node) { return this.responseConstructor.apply(node, getBatchData(cache.get(node.getId()))); @@ -259,12 +169,14 @@ private HashMap getBatchData(NodeEntry nodeEntry) { V[] nodeShardEntries = nodeEntry.getData(); boolean[] emptyResponses = nodeEntry.getEmptyShardResponse(); HashMap shardData = new HashMap<>(); - for (Integer shardIdIndex : shardIdToArray.values()) { - if (emptyResponses[shardIdIndex]) { - shardData.put(arrayToShardId.get(shardIdIndex), emptyResponseBuilder.get()); - } else if (nodeShardEntries[shardIdIndex] != null) { + for (Map.Entry shardIdIndex : shardIdToArray.entrySet()) { + ShardId shardId = shardIdIndex.getKey(); + Integer arrIndex = shardIdIndex.getValue(); + if (emptyResponses[arrIndex]) { + shardData.put(shardId, emptyResponse); + } else if (nodeShardEntries[arrIndex] != null) { // ignore null responses here - shardData.put(arrayToShardId.get(shardIdIndex), nodeShardEntries[shardIdIndex]); + shardData.put(shardId, nodeShardEntries[arrIndex]); } } return shardData; @@ -288,20 +200,12 @@ static class NodeEntry extends BaseNodeEntry { // actually needed in allocation/explain API response. So instead of storing full empty response object // in cache, it's better to just store a boolean and create that object on the fly just before // decision-making. - private final Function getException; private final Function isEmpty; - NodeEntry( - String nodeId, - Class clazz, - int batchSize, - Function getResponseException, - Function isEmptyResponse - ) { + NodeEntry(String nodeId, Class clazz, int batchSize, Function isEmptyResponse) { super(nodeId); this.shardData = (V[]) Array.newInstance(clazz, batchSize); this.emptyShardResponse = new boolean[batchSize]; - this.getException = getResponseException; this.isEmpty = isEmptyResponse; } @@ -324,15 +228,14 @@ boolean[] getEmptyShardResponse() { } private void fillShardData(Map shardDataFromNode, Map shardIdKey) { - for (ShardId shardId : shardDataFromNode.keySet()) { - if (shardDataFromNode.get(shardId) != null) { - if (isEmpty.apply(shardDataFromNode.get(shardId))) { + for (Map.Entry shardData : shardDataFromNode.entrySet()) { + if (shardData.getValue() != null) { + ShardId shardId = shardData.getKey(); + if (isEmpty.apply(shardData.getValue())) { this.emptyShardResponse[shardIdKey.get(shardId)] = true; this.shardData[shardIdKey.get(shardId)] = null; - } else if (getException.apply(shardDataFromNode.get(shardId)) == null) { - this.shardData[shardIdKey.get(shardId)] = shardDataFromNode.get(shardId); } - // if exception is not null, we got unhandled failure for the shard which needs to be ignored + this.shardData[shardIdKey.get(shardId)] = shardData.getValue(); } } } diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 3aa431c327a9d..29ec41beedd21 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -120,14 +120,15 @@ protected AsyncShardFetch( String type, Map shardAttributesMap, Lister, T> action, - String batchId + String batchId, + AsyncShardFetchCache cache ) { this.logger = logger; this.type = type; this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; this.reroutingKey = "BatchID=[" + batchId + "]"; - cache = new ShardCache<>(logger, reroutingKey, type); + this.cache = cache; } @Override diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java index 960bbaa1e0ede..1979f33484d49 100644 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java @@ -15,8 +15,8 @@ import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.gateway.AsyncShardFetch.FetchResult; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard; -import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; import java.util.ArrayList; @@ -136,10 +136,10 @@ private static List adaptToNodeShardStates( GatewayStartedShard shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId()); nodeShardStates.add( new NodeGatewayStartedShard( - shardData.get().allocationId(), - shardData.get().primary(), - shardData.get().replicationCheckpoint(), - shardData.get().storeException(), + shardData.allocationId(), + shardData.primary(), + shardData.replicationCheckpoint(), + shardData.storeException(), node ) ); diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java index c341fd9e981ba..71c2c2cd2a444 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesGatewayStartedShardHelper.java @@ -233,8 +233,11 @@ public String toString() { return buf.toString(); } - public Boolean isEmpty() { - return allocationId == null && primary == false && storeException == null && replicationCheckpoint == null; + public static Boolean isEmpty(GatewayStartedShard gatewayStartedShard) { + return gatewayStartedShard.allocationId() == null + && gatewayStartedShard.primary() == false + && gatewayStartedShard.storeException() == null + && gatewayStartedShard.replicationCheckpoint() == null; } } diff --git a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java index 513b46d0429c1..9886e8472422c 100644 --- a/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java +++ b/server/src/main/java/org/opensearch/gateway/TransportNodesListGatewayStartedShardsBatch.java @@ -38,6 +38,7 @@ import java.util.Map; import java.util.Objects; +import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.INDEX_NOT_FOUND; import static org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.getShardInfoOnLocalNode; @@ -135,23 +136,22 @@ protected NodesGatewayStartedShardsBatch newResponse( @Override protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { Map shardsOnNode = new HashMap<>(); + // NOTE : If we ever change this for loop to run in parallel threads, we should re-visit the exception + // handling in AsyncShardBatchFetch class. for (Map.Entry shardAttr : request.shardAttributes.entrySet()) { final ShardId shardId = shardAttr.getKey(); try { shardsOnNode.put( shardId, - new GatewayStartedShard( - getShardInfoOnLocalNode( - logger, - shardId, - namedXContentRegistry, - nodeEnv, - indicesService, - shardAttr.getValue().getCustomDataPath(), - settings, - clusterService - ), - null + getShardInfoOnLocalNode( + logger, + shardId, + namedXContentRegistry, + nodeEnv, + indicesService, + shardAttr.getValue().getCustomDataPath(), + settings, + clusterService ) ); } catch (Exception e) { @@ -160,10 +160,7 @@ protected NodeGatewayStartedShardsBatch nodeOperation(NodeRequest request) { shardsOnNode.put(shardId, null); } else { // return actual exception as it is for unknown exceptions - shardsOnNode.put( - shardId, - new GatewayStartedShard(new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), e) - ); + shardsOnNode.put(shardId, new GatewayStartedShard(null, false, null, e)); } } } @@ -256,57 +253,6 @@ public void writeTo(StreamOutput out) throws IOException { } } - /** - * Primary shard response from node. It contains the metadata in - * {@link TransportNodesGatewayStartedShardHelper.GatewayStartedShard} and any exception thrown from code will be - * stored in transportError. This exception is stored specifically to disambiguate the store related exceptions - * present in {@link TransportNodesGatewayStartedShardHelper.GatewayStartedShard} as those exceptions may be used - * during decision-making. - */ - public static class GatewayStartedShard { - private final TransportNodesGatewayStartedShardHelper.GatewayStartedShard gatewayStartedShard; - private final Exception transportError; - - public GatewayStartedShard( - TransportNodesGatewayStartedShardHelper.GatewayStartedShard gatewayStartedShard, - Exception transportError - ) { - this.gatewayStartedShard = gatewayStartedShard; - this.transportError = transportError; - } - - public GatewayStartedShard(StreamInput in) throws IOException { - this.gatewayStartedShard = new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(in); - if (in.readBoolean()) { - this.transportError = in.readException(); - } else { - this.transportError = null; - } - } - - public void writeTo(StreamOutput out) throws IOException { - gatewayStartedShard.writeTo(out); - if (transportError != null) { - out.writeBoolean(true); - out.writeException(transportError); - } else { - out.writeBoolean(false); - } - } - - public static boolean isEmpty(GatewayStartedShard gatewayStartedShard) { - return gatewayStartedShard.get().isEmpty() && gatewayStartedShard.getTransportError() == null; - } - - public Exception getTransportError() { - return transportError; - } - - public TransportNodesGatewayStartedShardHelper.GatewayStartedShard get() { - return gatewayStartedShard; - } - } - /** * This is the response from a single node, this is used in {@link NodesGatewayStartedShardsBatch} for creating * node to its response mapping for this transport request. diff --git a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java index 3d37ab026835a..85d5bff4677ef 100644 --- a/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java +++ b/server/src/main/java/org/opensearch/indices/store/TransportNodesListShardStoreMetadataBatch.java @@ -155,7 +155,7 @@ private Map listStoreMetadata(NodeRequest reque shardStoreMetadataMap.put(shardId, new NodeStoreFilesMetadata(storeFilesMetadata, null)); } catch (Exception e) { // should return null in case of known exceptions being returned from listShardMetadataInternal method. - if (e.getMessage().contains(INDEX_NOT_FOUND)) { + if (e.getMessage().contains(INDEX_NOT_FOUND) || e instanceof IOException) { shardStoreMetadataMap.put(shardId, null); } else { // return actual exception as it is for unknown exceptions diff --git a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java index 3502cc8996fa2..db97c3ece94ba 100644 --- a/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/opensearch/gateway/AsyncShardFetchTests.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.store.ShardAttributes; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; @@ -47,7 +46,6 @@ import org.junit.Before; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -86,16 +84,7 @@ public class AsyncShardFetchTests extends OpenSearchTestCase { public void setUp() throws Exception { super.setUp(); this.threadPool = new TestThreadPool(getTestName()); - if (randomBoolean()) { - this.test = new TestFetch(threadPool); - } else { - HashMap shardToCustomDataPath = new HashMap<>(); - ShardId shardId0 = new ShardId("index1", "index_uuid1", 0); - ShardId shardId1 = new ShardId("index2", "index_uuid2", 0); - shardToCustomDataPath.put(shardId0, new ShardAttributes("")); - shardToCustomDataPath.put(shardId1, new ShardAttributes("")); - this.test = new TestFetch(threadPool, shardToCustomDataPath); - } + this.test = new TestFetch(threadPool); } @After @@ -414,11 +403,6 @@ static class Entry { this.threadPool = threadPool; } - TestFetch(ThreadPool threadPool, Map shardAttributesMap) { - super(LogManager.getLogger(TestFetch.class), "test", shardAttributesMap, null, "test-batch"); - this.threadPool = threadPool; - } - public void addSimulation(String nodeId, Response response) { simulations.put(nodeId, new Entry(response, null)); } diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java index ca66998bd448f..522ad2a64ea5d 100644 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java @@ -29,6 +29,7 @@ import org.opensearch.common.util.set.Sets; import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.Environment; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.index.IndexSettings; import org.opensearch.index.codec.CodecService; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -289,17 +290,9 @@ public TestBatchAllocator addData( if (data == null) { data = new HashMap<>(); } - Map shardData = Map.of( + Map shardData = Map.of( shardId, - new TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard( - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( - allocationId, - primary, - replicationCheckpoint, - storeException - ), - null - ) + new GatewayStartedShard(allocationId, primary, replicationCheckpoint, storeException) ); data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); return this; @@ -316,19 +309,8 @@ public TestBatchAllocator addShardData( if (data == null) { data = new HashMap<>(); } - Map shardData = new HashMap<>(); - shardData.put( - shardId, - new TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard( - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard( - allocationId, - primary, - replicationCheckpoint, - storeException - ), - null - ) - ); + Map shardData = new HashMap<>(); + shardData.put(shardId, new GatewayStartedShard(allocationId, primary, replicationCheckpoint, storeException)); if (data.get(node) != null) shardData.putAll(data.get(node).getNodeGatewayStartedShardsBatch()); data.put(node, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch(node, shardData)); return this; diff --git a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java index 085793162b3c4..623103a0848cd 100644 --- a/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java +++ b/server/src/test/java/org/opensearch/gateway/ShardBatchCacheTests.java @@ -16,7 +16,7 @@ import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.GatewayStartedShard; +import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.GatewayStartedShard; import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; import org.opensearch.indices.store.ShardAttributes; @@ -53,9 +53,7 @@ public void setupShardBatchCache(String batchId, int numberOfShards) { GatewayStartedShard.class, NodeGatewayStartedShardsBatch::new, NodeGatewayStartedShardsBatch::getNodeGatewayStartedShardsBatch, - () -> new GatewayStartedShard(new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), null), - this::removeShard, - GatewayStartedShard::getTransportError, + new GatewayStartedShard(null, false, null, null), GatewayStartedShard::isEmpty ); } @@ -128,7 +126,7 @@ public void testPutData() { ); assertEquals(2, fetchData.size()); assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); - assertEquals("alloc-1", fetchData.get(node1).getNodeGatewayStartedShardsBatch().get(shard).get().allocationId()); + assertEquals("alloc-1", fetchData.get(node1).getNodeGatewayStartedShardsBatch().get(shard).allocationId()); assertEquals(10, fetchData.get(node2).getNodeGatewayStartedShardsBatch().size()); assertTrue(GatewayStartedShard.isEmpty(fetchData.get(node2).getNodeGatewayStartedShardsBatch().get(shard))); @@ -150,7 +148,7 @@ public void testNullResponses() { assertTrue(fetchData.get(node1).getNodeGatewayStartedShardsBatch().isEmpty()); } - public void testFilterFailedShards() { + public void testShardsDataWithException() { setupShardBatchCache(BATCH_ID, NUMBER_OF_SHARDS_DEFAULT); this.shardCache.initData(node1); this.shardCache.initData(node2); @@ -163,7 +161,7 @@ public void testFilterFailedShards() { // assertEquals(5, batchInfo.size()); assertEquals(2, fetchData.size()); - assertEquals(5, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); + assertEquals(10, fetchData.get(node1).getNodeGatewayStartedShardsBatch().size()); assertTrue(fetchData.get(node2).getNodeGatewayStartedShardsBatch().isEmpty()); } @@ -176,22 +174,10 @@ private Map getPrimaryResponse(List shard shardData.put(shard, null); break; case EMPTY: - shardData.put( - shard, - new GatewayStartedShard( - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard(null, false, null, null), - null - ) - ); + shardData.put(shard, new GatewayStartedShard(null, false, null, null)); break; case VALID: - shardData.put( - shard, - new GatewayStartedShard( - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard("alloc-" + allocationId++, false, null, null), - null - ) - ); + shardData.put(shard, new GatewayStartedShard("alloc-" + allocationId++, false, null, null)); break; default: throw new AssertionError("unknown response type"); @@ -207,19 +193,10 @@ private Map getFailedPrimaryResponse(List if (failedShardsCount-- > 0) { shardData.put( shard, - new GatewayStartedShard( - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard("alloc-" + allocationId++, false, null, null), - new OpenSearchRejectedExecutionException() - ) + new GatewayStartedShard("alloc-" + allocationId++, false, null, new OpenSearchRejectedExecutionException()) ); } else { - shardData.put( - shard, - new GatewayStartedShard( - new TransportNodesGatewayStartedShardHelper.GatewayStartedShard("alloc-" + allocationId++, false, null, null), - null - ) - ); + shardData.put(shard, new GatewayStartedShard("alloc-" + allocationId++, false, null, null)); } } return shardData;