Skip to content

Commit

Permalink
Remove failed shard handling from base class as it should be there in…
Browse files Browse the repository at this point in the history
… child class

Signed-off-by: Aman Khare <[email protected]>
  • Loading branch information
Aman Khare committed Mar 1, 2024
1 parent 8383c63 commit 11e145d
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 18 deletions.
14 changes: 0 additions & 14 deletions server/src/main/java/org/opensearch/gateway/AsyncShardFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
}

cache.fillShardCacheWithDataNodes(nodes);
List<ShardId> failedShards = clearFailedShards();
List<String> nodeIds = cache.findNodesToFetch();
if (nodeIds.isEmpty() == false) {
// mark all node as fetching and go ahead and async fetch them
Expand Down Expand Up @@ -206,25 +205,12 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
+ allIgnoreNodesMap.values().stream().mapToInt(Set::size).sum()
+ "]"
);
} else if (failedShards.isEmpty() == false) {
// trigger a reroute if there are any shards failed, to make sure they're picked up in next run
logger.trace("triggering another reroute for failed shards in {}", reroutingKey);
reroute("shards-failed", "shards failed in " + reroutingKey);
}

return new FetchResult<>(fetchData, allIgnoreNodesMap);
}
}

private List<ShardId> clearFailedShards() {
// get failed shards from previous fetch and remove them
List<ShardId> failedShards = cache.getFailedShards();
if (failedShards != null && failedShards.isEmpty() == false) {
shardAttributesMap.keySet().removeIf(failedShards::contains);
}
return failedShards;
}

/**
* Called by the response handler of the async action to fetch data. Verifies that its still working
* on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ protected BaseShardCache(Logger logger, String logKey, String type) {
public abstract K getData(DiscoveryNode node);

/**
* Provide the list of shards which got failures, these shards should be removed
* Provide the list of shards which got failures, these shards should be retried
* @return list of failed shards
*/
public abstract List<ShardId> getFailedShards();
Expand Down Expand Up @@ -184,7 +184,7 @@ public void processResponses(List<K> responses, long fetchingRound) {
}
}

public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) {
private boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound) {
if (nodeEntry.getFetchingRound() != fetchingRound) {
assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
logger.trace(
Expand All @@ -203,7 +203,7 @@ public boolean validateNodeResponse(BaseNodeEntry nodeEntry, long fetchingRound)
return true;
}

public void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) {
private void handleNodeFailure(BaseNodeEntry nodeEntry, FailedNodeException failure, long fetchingRound) {
if (nodeEntry.getFetchingRound() != fetchingRound) {
assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
logger.trace(
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/gateway/ShardCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@
*/
public class ShardCache<K extends BaseNodeResponse> extends BaseShardCache<K> {

private final Map<String, NodeEntry<K>> cache = new HashMap<>();
private final Map<String, NodeEntry<K>> cache;

public ShardCache(Logger logger, String logKey, String type) {
super(logger, logKey, type);
cache = new HashMap<>();
}

@Override
Expand Down

0 comments on commit 11e145d

Please sign in to comment.