From c26c6c116c137ebb81d6ca711bd20b281753ae4c Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 23 Feb 2024 17:11:20 +0530 Subject: [PATCH 01/11] Abstract AsyncShardFetch cache to allow restructuring for batch mode Signed-off-by: Aman Khare --- .../opensearch/gateway/AsyncShardFetch.java | 244 +------------ .../opensearch/gateway/BaseShardCache.java | 329 ++++++++++++++++++ .../org/opensearch/gateway/ShardCache.java | 80 +++++ 3 files changed, 425 insertions(+), 228 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/BaseShardCache.java create mode 100644 server/src/main/java/org/opensearch/gateway/ShardCache.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 50774f7e0cb1c..4de8447f07421 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -86,7 +86,7 @@ public interface Lister, N protected final String type; protected final Map shardAttributesMap; private final Lister, T> action; - private final Map> cache = new HashMap<>(); + private final BaseShardCache cache; private final AtomicLong round = new AtomicLong(); private boolean closed; private final String reroutingKey; @@ -109,6 +109,7 @@ protected AsyncShardFetch( this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; enableBatchMode = false; + cache = new ShardCache<>(logger, reroutingKey, type); } /** @@ -134,6 +135,7 @@ protected AsyncShardFetch( this.action = (Lister, T>) action; this.reroutingKey = "BatchID=[" + batchId + "]"; enableBatchMode = true; + cache = new ShardCache<>(logger, reroutingKey, type); } @Override @@ -141,19 +143,6 @@ public synchronized void close() { this.closed = true; } - /** - * Returns the number of async fetches that are currently ongoing. - */ - public synchronized int getNumberOfInFlightFetches() { - int count = 0; - for (NodeEntry nodeEntry : cache.values()) { - if (nodeEntry.isFetching()) { - count++; - } - } - return count; - } - /** * Fetches the data for the relevant shard. If there any ongoing async fetches going on, or new ones have * been initiated by this call, the result will have no data. @@ -187,48 +176,26 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map> nodesToFetch = findNodesToFetch(cache); - if (nodesToFetch.isEmpty() == false) { + cache.fillShardCacheWithDataNodes(nodes); + List nodeIds = cache.findNodesToFetch(); + if (nodeIds.isEmpty() == false) { // mark all node as fetching and go ahead and async fetch them // use a unique round id to detect stale responses in processAsyncFetch final long fetchingRound = round.incrementAndGet(); - for (NodeEntry nodeEntry : nodesToFetch) { - nodeEntry.markAsFetching(fetchingRound); - } - DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream() - .map(NodeEntry::getNodeId) + cache.markAsFetching(nodeIds, fetchingRound); + DiscoveryNode[] discoNodesToFetch = nodeIds.stream() .map(nodes::get) .toArray(DiscoveryNode[]::new); asyncFetch(discoNodesToFetch, fetchingRound); } // if we are still fetching, return null to indicate it - if (hasAnyNodeFetching(cache)) { + if (cache.hasAnyNodeFetching()) { return new FetchResult<>(null, emptyMap()); } else { // nothing to fetch, yay, build the return value - Map fetchData = new HashMap<>(); Set failedNodes = new HashSet<>(); - for (Iterator>> it = cache.entrySet().iterator(); it.hasNext();) { - Map.Entry> entry = it.next(); - String nodeId = entry.getKey(); - NodeEntry nodeEntry = entry.getValue(); - - DiscoveryNode node = nodes.get(nodeId); - if (node != null) { - if (nodeEntry.isFailed()) { - // if its failed, remove it from the list of nodes, so if this run doesn't work - // we try again next round to fetch it again - it.remove(); - failedNodes.add(nodeEntry.getNodeId()); - } else { - if (nodeEntry.getValue() != null) { - fetchData.put(node, nodeEntry.getValue()); - } - } - } - } + Map fetchData = cache.populateCache(nodes, failedNodes); Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes)); // clear the nodes to ignore, we had a successful run in fetching everything we can @@ -268,77 +235,18 @@ protected synchronized void processAsyncFetch(List responses, List nodeEntry = cache.get(response.getNode().getId()); - if (nodeEntry != null) { - if (nodeEntry.getFetchingRound() != fetchingRound) { - assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; - logger.trace( - "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - reroutingKey, - nodeEntry.getNodeId(), - type, - nodeEntry.getFetchingRound(), - fetchingRound - ); - } else if (nodeEntry.isFailed()) { - logger.trace( - "{} node {} has failed for [{}] (failure [{}])", - reroutingKey, - nodeEntry.getNodeId(), - type, - nodeEntry.getFailure() - ); - } else { - // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.trace("{} marking {} as done for [{}], result is [{}]", reroutingKey, nodeEntry.getNodeId(), type, response); - nodeEntry.doneFetching(response); - } - } - } + cache.processResponses(responses, fetchingRound); } if (failures != null) { - for (FailedNodeException failure : failures) { - logger.trace("{} processing failure {} for [{}]", reroutingKey, failure, type); - NodeEntry nodeEntry = cache.get(failure.nodeId()); - if (nodeEntry != null) { - if (nodeEntry.getFetchingRound() != fetchingRound) { - assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; - logger.trace( - "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - reroutingKey, - nodeEntry.getNodeId(), - type, - nodeEntry.getFetchingRound(), - fetchingRound - ); - } else if (nodeEntry.isFailed() == false) { - // if the entry is there, for the right fetching round and not marked as failed already, process it - Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); - // if the request got rejected or timed out, we need to try it again next time... - if (unwrappedCause instanceof OpenSearchRejectedExecutionException - || unwrappedCause instanceof ReceiveTimeoutTransportException - || unwrappedCause instanceof OpenSearchTimeoutException) { - nodeEntry.restartFetching(); - } else { - logger.warn( - () -> new ParameterizedMessage( - "{}: failed to list shard for {} on node [{}]", - reroutingKey, - type, - failure.nodeId() - ), - failure - ); - nodeEntry.doneFetching(failure.getCause()); - } - } - } - } + cache.processFailures(failures, fetchingRound); } reroute(reroutingKey, "post_response"); } + public int getNumberOfInFlightFetches() { + return cache.getInflightFetches(); + } + /** * Implement this in order to scheduled another round that causes a call to fetch data. */ @@ -351,47 +259,6 @@ synchronized void clearCacheForNode(String nodeId) { cache.remove(nodeId); } - /** - * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from - * it nodes that are no longer part of the state. - */ - private void fillShardCacheWithDataNodes(Map> shardCache, DiscoveryNodes nodes) { - // verify that all current data nodes are there - for (final DiscoveryNode node : nodes.getDataNodes().values()) { - if (shardCache.containsKey(node.getId()) == false) { - shardCache.put(node.getId(), new NodeEntry(node.getId())); - } - } - // remove nodes that are not longer part of the data nodes set - shardCache.keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId)); - } - - /** - * Finds all the nodes that need to be fetched. Those are nodes that have no - * data, and are not in fetch mode. - */ - private List> findNodesToFetch(Map> shardCache) { - List> nodesToFetch = new ArrayList<>(); - for (NodeEntry nodeEntry : shardCache.values()) { - if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { - nodesToFetch.add(nodeEntry); - } - } - return nodesToFetch; - } - - /** - * Are there any nodes that are fetching data? - */ - private boolean hasAnyNodeFetching(Map> shardCache) { - for (NodeEntry nodeEntry : shardCache.values()) { - if (nodeEntry.isFetching()) { - return true; - } - } - return false; - } - /** * Async fetches data for the provided shard with the set of nodes that need to be fetched from. */ @@ -460,83 +327,4 @@ public void processAllocation(RoutingAllocation allocation) { } } - - /** - * A node entry, holding the state of the fetched data for a specific shard - * for a giving node. - */ - static class NodeEntry { - private final String nodeId; - private boolean fetching; - @Nullable - private T value; - private boolean valueSet; - private Throwable failure; - private long fetchingRound; - - NodeEntry(String nodeId) { - this.nodeId = nodeId; - } - - String getNodeId() { - return this.nodeId; - } - - boolean isFetching() { - return fetching; - } - - void markAsFetching(long fetchingRound) { - assert fetching == false : "double marking a node as fetching"; - this.fetching = true; - this.fetchingRound = fetchingRound; - } - - void doneFetching(T value) { - assert fetching : "setting value but not in fetching mode"; - assert failure == null : "setting value when failure already set"; - this.valueSet = true; - this.value = value; - this.fetching = false; - } - - void doneFetching(Throwable failure) { - assert fetching : "setting value but not in fetching mode"; - assert valueSet == false : "setting failure when already set value"; - assert failure != null : "setting failure can't be null"; - this.failure = failure; - this.fetching = false; - } - - void restartFetching() { - assert fetching : "restarting fetching, but not in fetching mode"; - assert valueSet == false : "value can't be set when restarting fetching"; - assert failure == null : "failure can't be set when restarting fetching"; - this.fetching = false; - } - - boolean isFailed() { - return failure != null; - } - - boolean hasData() { - return valueSet || failure != null; - } - - Throwable getFailure() { - assert hasData() : "getting failure when data has not been fetched"; - return failure; - } - - @Nullable - T getValue() { - assert failure == null : "trying to fetch value, but its marked as failed, check isFailed"; - assert valueSet : "value is not set, hasn't been fetched yet"; - return value; - } - - long getFetchingRound() { - return fetchingRound; - } - } } diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java new file mode 100644 index 0000000000000..6a54ee3b976a9 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -0,0 +1,329 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ExceptionsHelper; +import org.opensearch.OpenSearchTimeoutException; +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.transport.ReceiveTimeoutTransportException; +import reactor.util.annotation.NonNull; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Common functionalities of a cache for storing shard metadata. Cache maintains node level responses. + * Setting up the cache is required from implementation class. While set up, we need 3 functionalities from the user. + * initData : how to initialize an entry of shard cache for a node. + * putData : how to store the response of transport action in the cache. + * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or + * {@link ReplicaShardAllocator} + * + * @param Response type of transport action which has the data to be stored in the cache. + */ +public abstract class BaseShardCache { + private final Logger logger; + private final String logKey; + private final String type; + + protected BaseShardCache(Logger logger, String logKey, String type) { + this.logger = logger; + this.logKey = logKey; + this.type = type; + } + + /** + * Initialize cache's entry for a node. + * + * @param node for which node we need to initialize the cache. + */ + public abstract void initData(DiscoveryNode node); + + /** + * Store the response in the cache from node. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + public abstract void putData(DiscoveryNode node, K response); + + /** + * Populate the response from cache. + * + * @param node node for which we need the response. + * @return actual response. + */ + public abstract K getData(DiscoveryNode node); + + @NonNull + public abstract Map getCache(); + + public abstract void clearShardCache(ShardId shardId); + + /** + * Returns the number of fetches that are currently ongoing. + */ + public int getInflightFetches() { + int count = 0; + for (BaseNodeEntry nodeEntry : getCache().values()) { + if (nodeEntry.isFetching()) { + count++; + } + } + return count; + } + + /** + * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from + * it nodes that are no longer part of the state. + */ + public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { + // verify that all current data nodes are there + for (final DiscoveryNode node : nodes.getDataNodes().values()) { + if (getCache().containsKey(node.getId()) == false) { + initData(node); + } + } + // remove nodes that are not longer part of the data nodes set + getCache().keySet().removeIf(nodeId -> !nodes.nodeExists(nodeId)); + } + + /** + * Finds all the nodes that need to be fetched. Those are nodes that have no + * data, and are not in fetch mode. + */ + public List findNodesToFetch() { + List nodesToFetch = new ArrayList<>(); + for (BaseNodeEntry nodeEntry : getCache().values()) { + if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { + nodesToFetch.add(nodeEntry.getNodeId()); + } + } + return nodesToFetch; + } + + /** + * Are there any nodes that are fetching data? + */ + public boolean hasAnyNodeFetching() { + for (BaseNodeEntry nodeEntry : getCache().values()) { + if (nodeEntry.isFetching()) { + return true; + } + } + return false; + } + + /** + * Get the data from cache, ignore the failed entries. Use getData functional interface to get the data, as + * different implementations may have different ways to populate the data from cache. + * + * @param nodes Discovery nodes for which we need to return the cache data. + * @param failedNodes return failedNodes with the nodes where fetch has failed. + * @return Map of cache data for every DiscoveryNode. + */ + public Map populateCache(DiscoveryNodes nodes, Set failedNodes) { + Map fetchData = new HashMap<>(); + for (Iterator> it = getCache().entrySet().iterator(); it.hasNext(); ) { + Map.Entry entry = (Map.Entry) it.next(); + String nodeId = entry.getKey(); + BaseNodeEntry nodeEntry = entry.getValue(); + + DiscoveryNode node = nodes.get(nodeId); + if (node != null) { + if (nodeEntry.isFailed()) { + // if its failed, remove it from the list of nodes, so if this run doesn't work + // we try again next round to fetch it again + it.remove(); + failedNodes.add(nodeEntry.getNodeId()); + } else { + K nodeResponse = getData(node); + if (nodeResponse != null) { + fetchData.put(node, nodeResponse); + } + } + } + } + return fetchData; + } + + public void processResponses(List responses, long fetchingRound) { + for (K response : responses) { + BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); + if (nodeEntry != null) { + if (validateNodeResponse(nodeEntry, fetchingRound)) { + // if the entry is there, for the right fetching round and not marked as failed already, process it + logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type, + response); + putData(response.getNode(), response); + } + } + } + } + + public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) { + if (nodeEntry.getFetchingRound() != fetchingRound) { + assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; + logger.trace( + "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", + logKey, + nodeEntry.getNodeId(), + type, + nodeEntry.getFetchingRound(), + fetchingRound + ); + return false; + } else if (nodeEntry.isFailed()) { + logger.trace( + "{} node {} has failed for [{}] (failure [{}])", + logKey, + nodeEntry.getNodeId(), + type, + nodeEntry.getFailure() + ); + return false; + } + return true; + } + + public void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) { + if (nodeEntry.getFetchingRound() != fetchingRound) { + assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; + logger.trace( + "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", + logKey, + nodeEntry.getNodeId(), + type, + nodeEntry.getFetchingRound(), + fetchingRound + ); + } else if (nodeEntry.isFailed() == false) { + // if the entry is there, for the right fetching round and not marked as failed already, process it + Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); + // if the request got rejected or timed out, we need to try it again next time... + if (unwrappedCause instanceof OpenSearchRejectedExecutionException + || unwrappedCause instanceof ReceiveTimeoutTransportException + || unwrappedCause instanceof OpenSearchTimeoutException) { + nodeEntry.restartFetching(); + } else { + logger.warn( + () -> new ParameterizedMessage( + "{}: failed to list shard for {} on node [{}]", + logKey, + type, + failure.nodeId() + ), + failure + ); + nodeEntry.doneFetching(failure.getCause()); + } + } + } + + public void processFailures(List failures, long fetchingRound) { + for (FailedNodeException failure : failures) { + logger.trace("{} processing failure {} for [{}]", logKey, failure, type); + BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); + if (nodeEntry != null) { + handleNodeFailure(nodeEntry, failure, fetchingRound); + } + } + } + + public void remove(String nodeId) { + this.getCache().remove(nodeId); + } + + public void markAsFetching(List nodeIds, long fetchingRound) { + for (String nodeId : nodeIds) { + getCache().get(nodeId).markAsFetching(fetchingRound); + } + } + + + /** + * A node entry, holding only node level fetching related information. + * Actual metadata of shard is stored in child classes. + */ + static class BaseNodeEntry { + private final String nodeId; + private boolean fetching; + private boolean valueSet; + private Throwable failure; + private long fetchingRound; + + BaseNodeEntry(String nodeId) { + this.nodeId = nodeId; + } + + String getNodeId() { + return this.nodeId; + } + + boolean isFetching() { + return fetching; + } + + void markAsFetching(long fetchingRound) { + assert fetching == false : "double marking a node as fetching"; + this.fetching = true; + this.fetchingRound = fetchingRound; + } + + void doneFetching() { + assert fetching : "setting value but not in fetching mode"; + assert failure == null : "setting value when failure already set"; + this.valueSet = true; + this.fetching = false; + } + + void doneFetching(Throwable failure) { + assert fetching : "setting value but not in fetching mode"; + assert valueSet == false : "setting failure when already set value"; + assert failure != null : "setting failure can't be null"; + this.failure = failure; + this.fetching = false; + } + + void restartFetching() { + assert fetching : "restarting fetching, but not in fetching mode"; + assert valueSet == false : "value can't be set when restarting fetching"; + assert failure == null : "failure can't be set when restarting fetching"; + this.fetching = false; + } + + boolean isFailed() { + return failure != null; + } + + boolean hasData() { + return valueSet || failure != null; + } + + Throwable getFailure() { + assert hasData() : "getting failure when data has not been fetched"; + return failure; + } + + long getFetchingRound() { + return fetchingRound; + } + } +} diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java new file mode 100644 index 0000000000000..8526b8cfb84ac --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.apache.logging.log4j.Logger; +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Nullable; +import org.opensearch.core.index.shard.ShardId; + +import java.util.HashMap; +import java.util.Map; + +/** + * Cache implementation of transport actions returning single shard related data in the response. + * + * @param Response type of transport action. + */ +public class ShardCache extends BaseShardCache { + + private final Map> cache = new HashMap<>(); + + public ShardCache(Logger logger, String logKey, String type) { + super(logger, logKey, type); + } + + @Override + public void initData(DiscoveryNode node) { + cache.put(node.getId(), new NodeEntry<>(node.getId())); + } + + @Override + public void putData(DiscoveryNode node, K response) { + cache.get(node.getId()).doneFetching(response); + } + + @Override + public K getData(DiscoveryNode node) { + return cache.get(node.getId()).getValue(); + } + + @Override + public Map getCache() { + return cache; + } + + @Override + public void clearShardCache(ShardId shardId) { + cache.clear(); + } + + /** + * A node entry, holding the state of the fetched data for a specific shard + * for a giving node. + */ + static class NodeEntry extends BaseShardCache.BaseNodeEntry { + @Nullable + private T value; + + void doneFetching(T value) { + super.doneFetching(); + this.value = value; + } + + NodeEntry(String nodeId) { + super(nodeId); + } + + T getValue() { + return value; + } + + } +} From 608c8ae5fce90359010da04253b0ebf19efb0740 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 23 Feb 2024 18:21:18 +0530 Subject: [PATCH 02/11] spotless apply Signed-off-by: Aman Khare --- .../opensearch/gateway/AsyncShardFetch.java | 11 +-------- .../opensearch/gateway/BaseShardCache.java | 24 +++++-------------- 2 files changed, 7 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 4de8447f07421..5ad55db7ac571 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -32,27 +32,20 @@ package org.opensearch.gateway; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchTimeoutException; import org.opensearch.action.FailedNodeException; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.action.support.nodes.BaseNodesResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.store.ShardAttributes; -import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -183,9 +176,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map populateCache(DiscoveryNodes nodes, Set failedNodes) { Map fetchData = new HashMap<>(); - for (Iterator> it = getCache().entrySet().iterator(); it.hasNext(); ) { + for (Iterator> it = getCache().entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); String nodeId = entry.getKey(); BaseNodeEntry nodeEntry = entry.getValue(); @@ -170,8 +171,7 @@ public void processResponses(List responses, long fetchingRound) { if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type, - response); + logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type, response); putData(response.getNode(), response); } } @@ -191,13 +191,7 @@ public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) ); return false; } else if (nodeEntry.isFailed()) { - logger.trace( - "{} node {} has failed for [{}] (failure [{}])", - logKey, - nodeEntry.getNodeId(), - type, - nodeEntry.getFailure() - ); + logger.trace("{} node {} has failed for [{}] (failure [{}])", logKey, nodeEntry.getNodeId(), type, nodeEntry.getFailure()); return false; } return true; @@ -224,12 +218,7 @@ public void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failu nodeEntry.restartFetching(); } else { logger.warn( - () -> new ParameterizedMessage( - "{}: failed to list shard for {} on node [{}]", - logKey, - type, - failure.nodeId() - ), + () -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]", logKey, type, failure.nodeId()), failure ); nodeEntry.doneFetching(failure.getCause()); @@ -257,7 +246,6 @@ public void markAsFetching(List nodeIds, long fetchingRound) { } } - /** * A node entry, holding only node level fetching related information. * Actual metadata of shard is stored in child classes. From a6921f05535accbbdabf2b62054b46774e388ee5 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 23 Feb 2024 19:01:59 +0530 Subject: [PATCH 03/11] Add synchronized block Signed-off-by: Aman Khare --- .../src/main/java/org/opensearch/gateway/AsyncShardFetch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 5ad55db7ac571..068b6c620983c 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -234,7 +234,7 @@ protected synchronized void processAsyncFetch(List responses, List Date: Wed, 28 Feb 2024 14:30:00 +0530 Subject: [PATCH 04/11] Add support for handling failed shards in the cache Signed-off-by: Aman Khare --- .../org/opensearch/gateway/AsyncShardFetch.java | 14 ++++++++++++++ .../org/opensearch/gateway/BaseShardCache.java | 7 +++++++ .../java/org/opensearch/gateway/ShardCache.java | 8 ++++++++ 3 files changed, 29 insertions(+) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 068b6c620983c..e35c3f9414c97 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -170,6 +170,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map failedShards = clearFailedShards(); List nodeIds = cache.findNodesToFetch(); if (nodeIds.isEmpty() == false) { // mark all node as fetching and go ahead and async fetch them @@ -205,12 +206,25 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map(fetchData, allIgnoreNodesMap); } } + private List clearFailedShards() { + // get failed shards from previous fetch and remove them + List failedShards = cache.getFailedShards(); + if (failedShards !=null && failedShards.isEmpty() == false ) { + shardAttributesMap.keySet().removeIf(failedShards::contains); + } + return failedShards; + } + /** * Called by the response handler of the async action to fetch data. Verifies that its still working * on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index f9de73fbaf9ef..c34d00b0877d1 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -73,6 +73,13 @@ protected BaseShardCache(Logger logger, String logKey, String type) { */ public abstract K getData(DiscoveryNode node); + /** + * Provide the list of shards which got failures, these shards should be removed + * @return list of failed shards + */ + public abstract List getFailedShards(); + + @NonNull public abstract Map getCache(); diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index 8526b8cfb84ac..a5b23df3bf3b8 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -14,7 +14,9 @@ import org.opensearch.common.Nullable; import org.opensearch.core.index.shard.ShardId; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -55,6 +57,12 @@ public void clearShardCache(ShardId shardId) { cache.clear(); } + @Override + public List getFailedShards() { + // Single shard cache does not need to return that shard itself because handleFailure will take care of retries + return Collections.emptyList(); + } + /** * A node entry, holding the state of the fetched data for a specific shard * for a giving node. From 5b1770405678d699dbb7d73022bd6d8f05258250 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 28 Feb 2024 16:20:17 +0530 Subject: [PATCH 05/11] spotless style apply Signed-off-by: Aman Khare --- .../src/main/java/org/opensearch/gateway/AsyncShardFetch.java | 4 ++-- .../src/main/java/org/opensearch/gateway/BaseShardCache.java | 1 - 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index e35c3f9414c97..0f380f5559d6b 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -209,7 +209,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map(fetchData, allIgnoreNodesMap); @@ -219,7 +219,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map clearFailedShards() { // get failed shards from previous fetch and remove them List failedShards = cache.getFailedShards(); - if (failedShards !=null && failedShards.isEmpty() == false ) { + if (failedShards != null && failedShards.isEmpty() == false) { shardAttributesMap.keySet().removeIf(failedShards::contains); } return failedShards; diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index c34d00b0877d1..488be7688993c 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -79,7 +79,6 @@ protected BaseShardCache(Logger logger, String logKey, String type) { */ public abstract List getFailedShards(); - @NonNull public abstract Map getCache(); From 11e145d1c4f3557466f9e0ba425f323f29748a44 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 1 Mar 2024 12:20:41 +0530 Subject: [PATCH 06/11] Remove failed shard handling from base class as it should be there in child class Signed-off-by: Aman Khare --- .../org/opensearch/gateway/AsyncShardFetch.java | 14 -------------- .../org/opensearch/gateway/BaseShardCache.java | 6 +++--- .../java/org/opensearch/gateway/ShardCache.java | 3 ++- 3 files changed, 5 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 0f380f5559d6b..068b6c620983c 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -170,7 +170,6 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map failedShards = clearFailedShards(); List nodeIds = cache.findNodesToFetch(); if (nodeIds.isEmpty() == false) { // mark all node as fetching and go ahead and async fetch them @@ -206,25 +205,12 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map(fetchData, allIgnoreNodesMap); } } - private List clearFailedShards() { - // get failed shards from previous fetch and remove them - List failedShards = cache.getFailedShards(); - if (failedShards != null && failedShards.isEmpty() == false) { - shardAttributesMap.keySet().removeIf(failedShards::contains); - } - return failedShards; - } - /** * Called by the response handler of the async action to fetch data. Verifies that its still working * on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index 488be7688993c..bc8df9043be2b 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -74,7 +74,7 @@ protected BaseShardCache(Logger logger, String logKey, String type) { public abstract K getData(DiscoveryNode node); /** - * Provide the list of shards which got failures, these shards should be removed + * Provide the list of shards which got failures, these shards should be retried * @return list of failed shards */ public abstract List getFailedShards(); @@ -184,7 +184,7 @@ public void processResponses(List responses, long fetchingRound) { } } - public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) { + private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) { if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( @@ -203,7 +203,7 @@ public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) return true; } - public void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) { + private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) { if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index a5b23df3bf3b8..9669c0e24e7c9 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -26,10 +26,11 @@ */ public class ShardCache extends BaseShardCache { - private final Map> cache = new HashMap<>(); + private final Map> cache; public ShardCache(Logger logger, String logKey, String type) { super(logger, logKey, type); + cache = new HashMap<>(); } @Override From dca5b7ce8018578cc19c1f80dcff5b05235f9811 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Tue, 5 Mar 2024 17:58:48 +0530 Subject: [PATCH 07/11] Add NodeCache interface to make code more readable and remove failed shard handling from single shard cache Signed-off-by: Aman Khare --- .../opensearch/gateway/BaseShardCache.java | 78 ++++++------------- .../org/opensearch/gateway/NodeCache.java | 71 +++++++++++++++++ .../org/opensearch/gateway/ShardCache.java | 15 ++-- 3 files changed, 99 insertions(+), 65 deletions(-) create mode 100644 server/src/main/java/org/opensearch/gateway/NodeCache.java diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index bc8df9043be2b..d0fe209bbd00f 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -17,7 +17,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -27,19 +26,13 @@ import java.util.Map; import java.util.Set; -import reactor.util.annotation.NonNull; - /** * Common functionalities of a cache for storing shard metadata. Cache maintains node level responses. - * Setting up the cache is required from implementation class. While set up, we need 3 functionalities from the user. - * initData : how to initialize an entry of shard cache for a node. - * putData : how to store the response of transport action in the cache. - * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or - * {@link ReplicaShardAllocator} + * Setting up the cache is required from implementation class. * * @param Response type of transport action which has the data to be stored in the cache. */ -public abstract class BaseShardCache { +public abstract class BaseShardCache implements NodeCache { private final Logger logger; private final String logKey; private final String type; @@ -50,44 +43,10 @@ protected BaseShardCache(Logger logger, String logKey, String type) { this.type = type; } - /** - * Initialize cache's entry for a node. - * - * @param node for which node we need to initialize the cache. - */ - public abstract void initData(DiscoveryNode node); - - /** - * Store the response in the cache from node. - * - * @param node node from which we got the response. - * @param response shard metadata coming from node. - */ - public abstract void putData(DiscoveryNode node, K response); - - /** - * Populate the response from cache. - * - * @param node node for which we need the response. - * @return actual response. - */ - public abstract K getData(DiscoveryNode node); - - /** - * Provide the list of shards which got failures, these shards should be retried - * @return list of failed shards - */ - public abstract List getFailedShards(); - - @NonNull - public abstract Map getCache(); - - public abstract void clearShardCache(ShardId shardId); - /** * Returns the number of fetches that are currently ongoing. */ - public int getInflightFetches() { + int getInflightFetches() { int count = 0; for (BaseNodeEntry nodeEntry : getCache().values()) { if (nodeEntry.isFetching()) { @@ -101,7 +60,7 @@ public int getInflightFetches() { * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from * it nodes that are no longer part of the state. */ - public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { + void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { // verify that all current data nodes are there for (final DiscoveryNode node : nodes.getDataNodes().values()) { if (getCache().containsKey(node.getId()) == false) { @@ -116,7 +75,7 @@ public void fillShardCacheWithDataNodes(DiscoveryNodes nodes) { * Finds all the nodes that need to be fetched. Those are nodes that have no * data, and are not in fetch mode. */ - public List findNodesToFetch() { + List findNodesToFetch() { List nodesToFetch = new ArrayList<>(); for (BaseNodeEntry nodeEntry : getCache().values()) { if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) { @@ -129,7 +88,7 @@ public List findNodesToFetch() { /** * Are there any nodes that are fetching data? */ - public boolean hasAnyNodeFetching() { + boolean hasAnyNodeFetching() { for (BaseNodeEntry nodeEntry : getCache().values()) { if (nodeEntry.isFetching()) { return true; @@ -146,7 +105,7 @@ public boolean hasAnyNodeFetching() { * @param failedNodes return failedNodes with the nodes where fetch has failed. * @return Map of cache data for every DiscoveryNode. */ - public Map populateCache(DiscoveryNodes nodes, Set failedNodes) { + Map populateCache(DiscoveryNodes nodes, Set failedNodes) { Map fetchData = new HashMap<>(); for (Iterator> it = getCache().entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); @@ -171,7 +130,7 @@ public Map populateCache(DiscoveryNodes nodes, Set fai return fetchData; } - public void processResponses(List responses, long fetchingRound) { + void processResponses(List responses, long fetchingRound) { for (K response : responses) { BaseNodeEntry nodeEntry = getCache().get(response.getNode().getId()); if (nodeEntry != null) { @@ -218,9 +177,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail // if the entry is there, for the right fetching round and not marked as failed already, process it Throwable unwrappedCause = ExceptionsHelper.unwrapCause(failure.getCause()); // if the request got rejected or timed out, we need to try it again next time... - if (unwrappedCause instanceof OpenSearchRejectedExecutionException - || unwrappedCause instanceof ReceiveTimeoutTransportException - || unwrappedCause instanceof OpenSearchTimeoutException) { + if (retryableException(unwrappedCause)) { nodeEntry.restartFetching(); } else { logger.warn( @@ -232,7 +189,13 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail } } - public void processFailures(List failures, long fetchingRound) { + boolean retryableException(Throwable unwrappedCause) { + return unwrappedCause instanceof OpenSearchRejectedExecutionException + || unwrappedCause instanceof ReceiveTimeoutTransportException + || unwrappedCause instanceof OpenSearchTimeoutException; + } + + void processFailures(List failures, long fetchingRound) { for (FailedNodeException failure : failures) { logger.trace("{} processing failure {} for [{}]", logKey, failure, type); BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); @@ -242,11 +205,16 @@ public void processFailures(List failures, long fetchingRou } } - public void remove(String nodeId) { + /** + * Common function for removing whole node entry. + * + * @param nodeId nodeId to be cleaned. + */ + void remove(String nodeId) { this.getCache().remove(nodeId); } - public void markAsFetching(List nodeIds, long fetchingRound) { + void markAsFetching(List nodeIds, long fetchingRound) { for (String nodeId : nodeIds) { getCache().get(nodeId).markAsFetching(fetchingRound); } diff --git a/server/src/main/java/org/opensearch/gateway/NodeCache.java b/server/src/main/java/org/opensearch/gateway/NodeCache.java new file mode 100644 index 0000000000000..25f97e72a5d0c --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/NodeCache.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.support.nodes.BaseNodeResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; + +import java.util.Map; + +import reactor.util.annotation.NonNull; + +/** + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or + * {@link TransportNodesListShardStoreMetadata} using the given functionalities. + *

+ * initData : how to initialize an entry of shard cache for a node. + * putData : how to store the response of transport action in the cache. + * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or + * {@link ReplicaShardAllocator} + * + * @param Response type of transport action which has the data to be stored in the cache. + */ +public interface NodeCache { + + /** + * Initialize cache's entry for a node. + * + * @param node for which node we need to initialize the cache. + */ + void initData(DiscoveryNode node); + + /** + * Store the response in the cache from node. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + void putData(DiscoveryNode node, K response); + + /** + * Populate the response from cache. + * + * @param node node for which we need the response. + * @return actual response. + */ + K getData(DiscoveryNode node); + + /** + * Get actual map object of the cache + * + * @return map of nodeId and NodeEntry extending BaseNodeEntry + */ + @NonNull + Map getCache(); + + /** + * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will + * automatically be cleaned up once shards are assigned. + * + * @param shardId for which we need to free up the cached data. + */ + void deleteData(ShardId shardId); +} diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index 9669c0e24e7c9..a494c68074ba9 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -14,11 +14,11 @@ import org.opensearch.common.Nullable; import org.opensearch.core.index.shard.ShardId; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; +import reactor.util.annotation.NonNull; + /** * Cache implementation of transport actions returning single shard related data in the response. * @@ -48,20 +48,15 @@ public K getData(DiscoveryNode node) { return cache.get(node.getId()).getValue(); } + @NonNull @Override public Map getCache() { return cache; } @Override - public void clearShardCache(ShardId shardId) { - cache.clear(); - } - - @Override - public List getFailedShards() { - // Single shard cache does not need to return that shard itself because handleFailure will take care of retries - return Collections.emptyList(); + public void deleteData(ShardId shardId) { + cache.clear(); // single shard cache can clear the full map } /** From 450617f0cfbe8db3091e35f34c3965273eb3b5a3 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 6 Mar 2024 16:21:50 +0530 Subject: [PATCH 08/11] Move the cache methods back to BaseShardCache and delete NodeCache Signed-off-by: Aman Khare --- .../opensearch/gateway/AsyncShardFetch.java | 15 +--- .../opensearch/gateway/BaseShardCache.java | 76 +++++++++++++++---- .../org/opensearch/gateway/NodeCache.java | 71 ----------------- .../org/opensearch/gateway/ShardCache.java | 3 +- 4 files changed, 66 insertions(+), 99 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/gateway/NodeCache.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 068b6c620983c..76be6b78c3491 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -58,11 +58,8 @@ * Allows to asynchronously fetch shard related data from other nodes for allocation, without blocking * the cluster update thread. *

- * The async fetch logic maintains a map of which nodes are being fetched from in an async manner, - * and once the results are back, it makes sure to schedule a reroute to make sure those results will - * be taken into account. - * - * It comes in two modes, to single fetch a shard or fetch a batch of shards. + * The async fetch logic maintains a cache {@link BaseShardCache} which is filled in async manner when nodes respond back. + * It also schedules a reroute to make sure those results will be taken into account. * @opensearch.internal */ public abstract class AsyncShardFetch implements Releasable { @@ -85,8 +82,6 @@ public interface Lister, N private final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); - private final boolean enableBatchMode; - @SuppressWarnings("unchecked") protected AsyncShardFetch( Logger logger, @@ -101,7 +96,6 @@ protected AsyncShardFetch( shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; - enableBatchMode = false; cache = new ShardCache<>(logger, reroutingKey, type); } @@ -127,7 +121,6 @@ protected AsyncShardFetch( this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; this.reroutingKey = "BatchID=[" + batchId + "]"; - enableBatchMode = true; cache = new ShardCache<>(logger, reroutingKey, type); } @@ -148,7 +141,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map 1) { throw new IllegalStateException( @@ -186,7 +179,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map failedNodes = new HashSet<>(); - Map fetchData = cache.populateCache(nodes, failedNodes); + Map fetchData = cache.getCacheData(nodes, failedNodes); Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes)); // clear the nodes to ignore, we had a successful run in fetching everything we can diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index d0fe209bbd00f..62c5564737a81 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -17,6 +17,8 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -26,23 +28,70 @@ import java.util.Map; import java.util.Set; +import reactor.util.annotation.NonNull; + /** * Common functionalities of a cache for storing shard metadata. Cache maintains node level responses. * Setting up the cache is required from implementation class. + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or + * {@link TransportNodesListShardStoreMetadata} using the given functionalities. + *

+ * initData : how to initialize an entry of shard cache for a node. + * putData : how to store the response of transport action in the cache. + * getData : how to get the stored data for any shard allocators like {@link PrimaryShardAllocator} or + * {@link ReplicaShardAllocator} + * deleteData : how to clean up the stored data from cache for a shard. * * @param Response type of transport action which has the data to be stored in the cache. */ -public abstract class BaseShardCache implements NodeCache { +public abstract class BaseShardCache { private final Logger logger; - private final String logKey; private final String type; - protected BaseShardCache(Logger logger, String logKey, String type) { + protected BaseShardCache(Logger logger, String type) { this.logger = logger; - this.logKey = logKey; this.type = type; } + /** + * Initialize cache's entry for a node. + * + * @param node for which node we need to initialize the cache. + */ + abstract void initData(DiscoveryNode node); + + /** + * Store the response in the cache from node. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + abstract void putData(DiscoveryNode node, K response); + + /** + * Populate the response from cache. + * + * @param node node for which we need the response. + * @return actual response. + */ + abstract K getData(DiscoveryNode node); + + /** + * Get actual map object of the cache + * + * @return map of nodeId and NodeEntry extending BaseNodeEntry + */ + @NonNull + abstract Map getCache(); + + /** + * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will + * automatically be cleaned up once shards are assigned. + * + * @param shardId for which we need to free up the cached data. + */ + abstract void deleteData(ShardId shardId); + /** * Returns the number of fetches that are currently ongoing. */ @@ -105,7 +154,7 @@ boolean hasAnyNodeFetching() { * @param failedNodes return failedNodes with the nodes where fetch has failed. * @return Map of cache data for every DiscoveryNode. */ - Map populateCache(DiscoveryNodes nodes, Set failedNodes) { + Map getCacheData(DiscoveryNodes nodes, Set failedNodes) { Map fetchData = new HashMap<>(); for (Iterator> it = getCache().entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); @@ -136,7 +185,7 @@ void processResponses(List responses, long fetchingRound) { if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type, response); + logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response); putData(response.getNode(), response); } } @@ -147,8 +196,7 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( - "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + "received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -156,7 +204,7 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound ); return false; } else if (nodeEntry.isFailed()) { - logger.trace("{} node {} has failed for [{}] (failure [{}])", logKey, nodeEntry.getNodeId(), type, nodeEntry.getFailure()); + logger.trace("node {} has failed for [{}] (failure [{}])", nodeEntry.getNodeId(), type, nodeEntry.getFailure()); return false; } return true; @@ -166,8 +214,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( - "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + "received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -180,10 +227,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail if (retryableException(unwrappedCause)) { nodeEntry.restartFetching(); } else { - logger.warn( - () -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]", logKey, type, failure.nodeId()), - failure - ); + logger.warn(() -> new ParameterizedMessage("failed to list shard for {} on node [{}]", type, failure.nodeId()), failure); nodeEntry.doneFetching(failure.getCause()); } } @@ -197,7 +241,7 @@ boolean retryableException(Throwable unwrappedCause) { void processFailures(List failures, long fetchingRound) { for (FailedNodeException failure : failures) { - logger.trace("{} processing failure {} for [{}]", logKey, failure, type); + logger.trace("processing failure {} for [{}]", failure, type); BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); if (nodeEntry != null) { handleNodeFailure(nodeEntry, failure, fetchingRound); diff --git a/server/src/main/java/org/opensearch/gateway/NodeCache.java b/server/src/main/java/org/opensearch/gateway/NodeCache.java deleted file mode 100644 index 25f97e72a5d0c..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/NodeCache.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway; - -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; - -import java.util.Map; - -import reactor.util.annotation.NonNull; - -/** - * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or - * {@link TransportNodesListShardStoreMetadata} using the given functionalities. - *

- * initData : how to initialize an entry of shard cache for a node. - * putData : how to store the response of transport action in the cache. - * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or - * {@link ReplicaShardAllocator} - * - * @param Response type of transport action which has the data to be stored in the cache. - */ -public interface NodeCache { - - /** - * Initialize cache's entry for a node. - * - * @param node for which node we need to initialize the cache. - */ - void initData(DiscoveryNode node); - - /** - * Store the response in the cache from node. - * - * @param node node from which we got the response. - * @param response shard metadata coming from node. - */ - void putData(DiscoveryNode node, K response); - - /** - * Populate the response from cache. - * - * @param node node for which we need the response. - * @return actual response. - */ - K getData(DiscoveryNode node); - - /** - * Get actual map object of the cache - * - * @return map of nodeId and NodeEntry extending BaseNodeEntry - */ - @NonNull - Map getCache(); - - /** - * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will - * automatically be cleaned up once shards are assigned. - * - * @param shardId for which we need to free up the cached data. - */ - void deleteData(ShardId shardId); -} diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index a494c68074ba9..42c9b22f553bc 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -12,6 +12,7 @@ import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import java.util.HashMap; @@ -29,7 +30,7 @@ public class ShardCache extends BaseShardCache { private final Map> cache; public ShardCache(Logger logger, String logKey, String type) { - super(logger, logKey, type); + super(Loggers.getLogger(logger, "_" + logKey), type); cache = new HashMap<>(); } From 91e094af8b060edfaf45720d95b19d91e33b1b1d Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Wed, 6 Mar 2024 11:50:49 +0000 Subject: [PATCH 09/11] Move the cache methods back to BaseShardCache and delete NodeCache Signed-off-by: Aman Khare --- .../opensearch/gateway/AsyncShardFetch.java | 8 +- .../opensearch/gateway/BaseShardCache.java | 76 +++++++++++++++---- .../org/opensearch/gateway/NodeCache.java | 71 ----------------- .../org/opensearch/gateway/ShardCache.java | 3 +- 4 files changed, 64 insertions(+), 94 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/gateway/NodeCache.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 068b6c620983c..096fff228f307 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -85,8 +85,6 @@ public interface Lister, N private final String reroutingKey; private final Map> shardToIgnoreNodes = new HashMap<>(); - private final boolean enableBatchMode; - @SuppressWarnings("unchecked") protected AsyncShardFetch( Logger logger, @@ -101,7 +99,6 @@ protected AsyncShardFetch( shardAttributesMap.put(shardId, new ShardAttributes(shardId, customDataPath)); this.action = (Lister, T>) action; this.reroutingKey = "ShardId=[" + shardId.toString() + "]"; - enableBatchMode = false; cache = new ShardCache<>(logger, reroutingKey, type); } @@ -127,7 +124,6 @@ protected AsyncShardFetch( this.shardAttributesMap = shardAttributesMap; this.action = (Lister, T>) action; this.reroutingKey = "BatchID=[" + batchId + "]"; - enableBatchMode = true; cache = new ShardCache<>(logger, reroutingKey, type); } @@ -148,7 +144,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map 1) { throw new IllegalStateException( @@ -186,7 +182,7 @@ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Map failedNodes = new HashSet<>(); - Map fetchData = cache.populateCache(nodes, failedNodes); + Map fetchData = cache.getCacheData(nodes, failedNodes); Map> allIgnoreNodesMap = unmodifiableMap(new HashMap<>(shardToIgnoreNodes)); // clear the nodes to ignore, we had a successful run in fetching everything we can diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java index d0fe209bbd00f..62c5564737a81 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/BaseShardCache.java @@ -17,6 +17,8 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -26,23 +28,70 @@ import java.util.Map; import java.util.Set; +import reactor.util.annotation.NonNull; + /** * Common functionalities of a cache for storing shard metadata. Cache maintains node level responses. * Setting up the cache is required from implementation class. + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or + * {@link TransportNodesListShardStoreMetadata} using the given functionalities. + *

+ * initData : how to initialize an entry of shard cache for a node. + * putData : how to store the response of transport action in the cache. + * getData : how to get the stored data for any shard allocators like {@link PrimaryShardAllocator} or + * {@link ReplicaShardAllocator} + * deleteData : how to clean up the stored data from cache for a shard. * * @param Response type of transport action which has the data to be stored in the cache. */ -public abstract class BaseShardCache implements NodeCache { +public abstract class BaseShardCache { private final Logger logger; - private final String logKey; private final String type; - protected BaseShardCache(Logger logger, String logKey, String type) { + protected BaseShardCache(Logger logger, String type) { this.logger = logger; - this.logKey = logKey; this.type = type; } + /** + * Initialize cache's entry for a node. + * + * @param node for which node we need to initialize the cache. + */ + abstract void initData(DiscoveryNode node); + + /** + * Store the response in the cache from node. + * + * @param node node from which we got the response. + * @param response shard metadata coming from node. + */ + abstract void putData(DiscoveryNode node, K response); + + /** + * Populate the response from cache. + * + * @param node node for which we need the response. + * @return actual response. + */ + abstract K getData(DiscoveryNode node); + + /** + * Get actual map object of the cache + * + * @return map of nodeId and NodeEntry extending BaseNodeEntry + */ + @NonNull + abstract Map getCache(); + + /** + * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will + * automatically be cleaned up once shards are assigned. + * + * @param shardId for which we need to free up the cached data. + */ + abstract void deleteData(ShardId shardId); + /** * Returns the number of fetches that are currently ongoing. */ @@ -105,7 +154,7 @@ boolean hasAnyNodeFetching() { * @param failedNodes return failedNodes with the nodes where fetch has failed. * @return Map of cache data for every DiscoveryNode. */ - Map populateCache(DiscoveryNodes nodes, Set failedNodes) { + Map getCacheData(DiscoveryNodes nodes, Set failedNodes) { Map fetchData = new HashMap<>(); for (Iterator> it = getCache().entrySet().iterator(); it.hasNext();) { Map.Entry entry = (Map.Entry) it.next(); @@ -136,7 +185,7 @@ void processResponses(List responses, long fetchingRound) { if (nodeEntry != null) { if (validateNodeResponse(nodeEntry, fetchingRound)) { // if the entry is there, for the right fetching round and not marked as failed already, process it - logger.trace("{} marking {} as done for [{}], result is [{}]", logKey, nodeEntry.getNodeId(), type, response); + logger.trace("marking {} as done for [{}], result is [{}]", nodeEntry.getNodeId(), type, response); putData(response.getNode(), response); } } @@ -147,8 +196,7 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( - "{} received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + "received response for [{}] from node {} for an older fetching round (expected: {} but was: {})", nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -156,7 +204,7 @@ private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound ); return false; } else if (nodeEntry.isFailed()) { - logger.trace("{} node {} has failed for [{}] (failure [{}])", logKey, nodeEntry.getNodeId(), type, nodeEntry.getFailure()); + logger.trace("node {} has failed for [{}] (failure [{}])", nodeEntry.getNodeId(), type, nodeEntry.getFailure()); return false; } return true; @@ -166,8 +214,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail if (nodeEntry.getFetchingRound() != fetchingRound) { assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds"; logger.trace( - "{} received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", - logKey, + "received failure for [{}] from node {} for an older fetching round (expected: {} but was: {})", nodeEntry.getNodeId(), type, nodeEntry.getFetchingRound(), @@ -180,10 +227,7 @@ private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException fail if (retryableException(unwrappedCause)) { nodeEntry.restartFetching(); } else { - logger.warn( - () -> new ParameterizedMessage("{}: failed to list shard for {} on node [{}]", logKey, type, failure.nodeId()), - failure - ); + logger.warn(() -> new ParameterizedMessage("failed to list shard for {} on node [{}]", type, failure.nodeId()), failure); nodeEntry.doneFetching(failure.getCause()); } } @@ -197,7 +241,7 @@ boolean retryableException(Throwable unwrappedCause) { void processFailures(List failures, long fetchingRound) { for (FailedNodeException failure : failures) { - logger.trace("{} processing failure {} for [{}]", logKey, failure, type); + logger.trace("processing failure {} for [{}]", failure, type); BaseNodeEntry nodeEntry = getCache().get(failure.nodeId()); if (nodeEntry != null) { handleNodeFailure(nodeEntry, failure, fetchingRound); diff --git a/server/src/main/java/org/opensearch/gateway/NodeCache.java b/server/src/main/java/org/opensearch/gateway/NodeCache.java deleted file mode 100644 index 25f97e72a5d0c..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/NodeCache.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway; - -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; - -import java.util.Map; - -import reactor.util.annotation.NonNull; - -/** - * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or - * {@link TransportNodesListShardStoreMetadata} using the given functionalities. - *

- * initData : how to initialize an entry of shard cache for a node. - * putData : how to store the response of transport action in the cache. - * getData : how to populate the stored data for any shard allocators like {@link PrimaryShardAllocator} or - * {@link ReplicaShardAllocator} - * - * @param Response type of transport action which has the data to be stored in the cache. - */ -public interface NodeCache { - - /** - * Initialize cache's entry for a node. - * - * @param node for which node we need to initialize the cache. - */ - void initData(DiscoveryNode node); - - /** - * Store the response in the cache from node. - * - * @param node node from which we got the response. - * @param response shard metadata coming from node. - */ - void putData(DiscoveryNode node, K response); - - /** - * Populate the response from cache. - * - * @param node node for which we need the response. - * @return actual response. - */ - K getData(DiscoveryNode node); - - /** - * Get actual map object of the cache - * - * @return map of nodeId and NodeEntry extending BaseNodeEntry - */ - @NonNull - Map getCache(); - - /** - * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will - * automatically be cleaned up once shards are assigned. - * - * @param shardId for which we need to free up the cached data. - */ - void deleteData(ShardId shardId); -} diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java index a494c68074ba9..42c9b22f553bc 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/ShardCache.java @@ -12,6 +12,7 @@ import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.index.shard.ShardId; import java.util.HashMap; @@ -29,7 +30,7 @@ public class ShardCache extends BaseShardCache { private final Map> cache; public ShardCache(Logger logger, String logKey, String type) { - super(logger, logKey, type); + super(Loggers.getLogger(logger, "_" + logKey), type); cache = new HashMap<>(); } From 80e0c8b3d477b183e3c421e4bc8354c32811a771 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 8 Mar 2024 16:29:59 +0530 Subject: [PATCH 10/11] Move ShardCache class as inner class of AsyncShardFetch and rename base class to make it specific Signed-off-by: Aman Khare --- .../opensearch/gateway/AsyncShardFetch.java | 84 ++++++++++++++++-- ...rdCache.java => AsyncShardFetchCache.java} | 41 +++------ .../org/opensearch/gateway/ShardCache.java | 85 ------------------- 3 files changed, 88 insertions(+), 122 deletions(-) rename server/src/main/java/org/opensearch/gateway/{BaseShardCache.java => AsyncShardFetchCache.java} (89%) delete mode 100644 server/src/main/java/org/opensearch/gateway/ShardCache.java diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java index 76be6b78c3491..3d129d4794a10 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java @@ -38,10 +38,13 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.Nullable; import org.opensearch.common.lease.Releasable; +import org.opensearch.common.logging.Loggers; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.store.ShardAttributes; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import java.util.ArrayList; import java.util.HashMap; @@ -51,6 +54,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import reactor.util.annotation.NonNull; + import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; @@ -58,8 +63,9 @@ * Allows to asynchronously fetch shard related data from other nodes for allocation, without blocking * the cluster update thread. *

- * The async fetch logic maintains a cache {@link BaseShardCache} which is filled in async manner when nodes respond back. + * The async fetch logic maintains a cache {@link AsyncShardFetchCache} which is filled in async manner when nodes respond back. * It also schedules a reroute to make sure those results will be taken into account. + * * @opensearch.internal */ public abstract class AsyncShardFetch implements Releasable { @@ -76,7 +82,7 @@ public interface Lister, N protected final String type; protected final Map shardAttributesMap; private final Lister, T> action; - private final BaseShardCache cache; + private final AsyncShardFetchCache cache; private final AtomicLong round = new AtomicLong(); private boolean closed; private final String reroutingKey; @@ -102,11 +108,11 @@ protected AsyncShardFetch( /** * Added to fetch a batch of shards from nodes * - * @param logger Logger - * @param type type of action + * @param logger Logger + * @param type type of action * @param shardAttributesMap Map of {@link ShardId} to {@link ShardAttributes} to perform fetching on them a - * @param action Transport Action - * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification + * @param action Transport Action + * @param batchId For the given ShardAttributesMap, we expect them to tie with a single batch id for logging and later identification */ @SuppressWarnings("unchecked") protected AsyncShardFetch( @@ -266,6 +272,72 @@ public void onFailure(Exception e) { }); } + /** + * Cache implementation of transport actions returning single shard related data in the response. + * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or + * {@link TransportNodesListShardStoreMetadata}. + * + * @param Response type of transport action. + */ + static class ShardCache extends AsyncShardFetchCache { + + private final Map> cache; + + public ShardCache(Logger logger, String logKey, String type) { + super(Loggers.getLogger(logger, "_" + logKey), type); + cache = new HashMap<>(); + } + + @Override + public void initData(DiscoveryNode node) { + cache.put(node.getId(), new NodeEntry<>(node.getId())); + } + + @Override + public void putData(DiscoveryNode node, K response) { + cache.get(node.getId()).doneFetching(response); + } + + @Override + public K getData(DiscoveryNode node) { + return cache.get(node.getId()).getValue(); + } + + @NonNull + @Override + public Map getCache() { + return cache; + } + + @Override + public void deleteShard(ShardId shardId) { + cache.clear(); // single shard cache can clear the full map + } + + /** + * A node entry, holding the state of the fetched data for a specific shard + * for a giving node. + */ + static class NodeEntry extends AsyncShardFetchCache.BaseNodeEntry { + @Nullable + private U value; + + void doneFetching(U value) { + super.doneFetching(); + this.value = value; + } + + NodeEntry(String nodeId) { + super(nodeId); + } + + U getValue() { + return value; + } + + } + } + /** * The result of a fetch operation. Make sure to first check {@link #hasData()} before * fetching the actual data. diff --git a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java similarity index 89% rename from server/src/main/java/org/opensearch/gateway/BaseShardCache.java rename to server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 62c5564737a81..a2847f09bc526 100644 --- a/server/src/main/java/org/opensearch/gateway/BaseShardCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -18,7 +18,6 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.core.index.shard.ShardId; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.transport.ReceiveTimeoutTransportException; import java.util.ArrayList; @@ -31,10 +30,12 @@ import reactor.util.annotation.NonNull; /** - * Common functionalities of a cache for storing shard metadata. Cache maintains node level responses. - * Setting up the cache is required from implementation class. - * Store node level responses of transport actions like {@link TransportNodesListGatewayStartedShards} or - * {@link TransportNodesListShardStoreMetadata} using the given functionalities. + * AsyncShardFetchCache will operate on the node level cache(Map). initData, putData + * and getData needs to be called for all the nodes. This class is responsible for managing the flow for all the + * nodes. + * It'll also give useful insights like how many ongoing fetches are happening, how many nodes are left for fetch or + * mark some node in fetching mode. All of these functionalities require checking the cache information and respond + * accordingly. *

* initData : how to initialize an entry of shard cache for a node. * putData : how to store the response of transport action in the cache. @@ -44,45 +45,23 @@ * * @param Response type of transport action which has the data to be stored in the cache. */ -public abstract class BaseShardCache { +public abstract class AsyncShardFetchCache { private final Logger logger; private final String type; - protected BaseShardCache(Logger logger, String type) { + protected AsyncShardFetchCache(Logger logger, String type) { this.logger = logger; this.type = type; } - /** - * Initialize cache's entry for a node. - * - * @param node for which node we need to initialize the cache. - */ abstract void initData(DiscoveryNode node); - /** - * Store the response in the cache from node. - * - * @param node node from which we got the response. - * @param response shard metadata coming from node. - */ abstract void putData(DiscoveryNode node, K response); - /** - * Populate the response from cache. - * - * @param node node for which we need the response. - * @return actual response. - */ abstract K getData(DiscoveryNode node); - /** - * Get actual map object of the cache - * - * @return map of nodeId and NodeEntry extending BaseNodeEntry - */ @NonNull - abstract Map getCache(); + abstract Map getCache(); /** * Cleanup cached data for this shard once it's started. Cleanup only happens at shard level. Node entries will @@ -90,7 +69,7 @@ protected BaseShardCache(Logger logger, String type) { * * @param shardId for which we need to free up the cached data. */ - abstract void deleteData(ShardId shardId); + abstract void deleteShard(ShardId shardId); /** * Returns the number of fetches that are currently ongoing. diff --git a/server/src/main/java/org/opensearch/gateway/ShardCache.java b/server/src/main/java/org/opensearch/gateway/ShardCache.java deleted file mode 100644 index 42c9b22f553bc..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/ShardCache.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway; - -import org.apache.logging.log4j.Logger; -import org.opensearch.action.support.nodes.BaseNodeResponse; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.common.Nullable; -import org.opensearch.common.logging.Loggers; -import org.opensearch.core.index.shard.ShardId; - -import java.util.HashMap; -import java.util.Map; - -import reactor.util.annotation.NonNull; - -/** - * Cache implementation of transport actions returning single shard related data in the response. - * - * @param Response type of transport action. - */ -public class ShardCache extends BaseShardCache { - - private final Map> cache; - - public ShardCache(Logger logger, String logKey, String type) { - super(Loggers.getLogger(logger, "_" + logKey), type); - cache = new HashMap<>(); - } - - @Override - public void initData(DiscoveryNode node) { - cache.put(node.getId(), new NodeEntry<>(node.getId())); - } - - @Override - public void putData(DiscoveryNode node, K response) { - cache.get(node.getId()).doneFetching(response); - } - - @Override - public K getData(DiscoveryNode node) { - return cache.get(node.getId()).getValue(); - } - - @NonNull - @Override - public Map getCache() { - return cache; - } - - @Override - public void deleteData(ShardId shardId) { - cache.clear(); // single shard cache can clear the full map - } - - /** - * A node entry, holding the state of the fetched data for a specific shard - * for a giving node. - */ - static class NodeEntry extends BaseShardCache.BaseNodeEntry { - @Nullable - private T value; - - void doneFetching(T value) { - super.doneFetching(); - this.value = value; - } - - NodeEntry(String nodeId) { - super(nodeId); - } - - T getValue() { - return value; - } - - } -} From ad0026a22cfbd8cd6ff47042f41f0c6e44383406 Mon Sep 17 00:00:00 2001 From: Aman Khare Date: Fri, 8 Mar 2024 18:51:15 +0530 Subject: [PATCH 11/11] Correct comment on java class Signed-off-by: Aman Khare --- .../java/org/opensearch/gateway/AsyncShardFetchCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java index 9fb85b8932c7d..3140ceef4f3ee 100644 --- a/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java +++ b/server/src/main/java/org/opensearch/gateway/AsyncShardFetchCache.java @@ -30,9 +30,9 @@ import reactor.util.annotation.NonNull; /** - * AsyncShardFetchCache will operate on the node level cache(Map). initData, putData - * and getData needs to be called for all the nodes. This class is responsible for managing the flow for all the - * nodes. + * AsyncShardFetchCache will operate on the node level cache which is map of String and BaseNodeEntry. initData, + * putData and getData needs to be called for all the nodes. This class is responsible for managing the flow for all + * the nodes. * It'll also give useful insights like how many ongoing fetches are happening, how many nodes are left for fetch or * mark some node in fetching mode. All of these functionalities require checking the cache information and respond * accordingly.