From 12c6d7baff657469bfb9787d2ca8009a1afc618e Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Wed, 4 Oct 2023 05:10:52 +0530 Subject: [PATCH] RSBA after RSA refactor Signed-off-by: Shivansh Arora --- .../gateway/ReplicaShardBatchAllocator.java | 409 ++---------------- .../ReplicaShardBatchAllocatorTest.java | 5 +- 2 files changed, 51 insertions(+), 363 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 65b714257c274..f0cc831a4af0f 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -9,7 +9,6 @@ package org.opensearch.gateway; import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.RoutingNode; @@ -21,11 +20,9 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.collect.Tuple; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.common.unit.TimeValue; -import org.opensearch.gateway.AsyncBatchShardFetch.FetchResult; -import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; +import org.opensearch.gateway.AsyncShardFetch.FetchResult; +import org.opensearch.indices.store.StoreFilesMetadata; +import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; import java.util.ArrayList; @@ -35,10 +32,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; -import static org.opensearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; - -public abstract class ReplicaShardBatchAllocator extends BaseGatewayShardAllocator { +/** + * Allocates replica shards in a batch mode + * + * @opensearch.internal + */ +public abstract class ReplicaShardBatchAllocator extends ReplicaShardAllocator { /** * Process existing recoveries of replicas and see if we need to cancel them if we find a better @@ -46,29 +47,28 @@ public abstract class ReplicaShardBatchAllocator extends BaseGatewayShardAllocat * has to copy segment files. */ public void processExistingRecoveries(RoutingAllocation allocation, List> shardBatches) { - Metadata metadata = allocation.metadata(); RoutingNodes routingNodes = allocation.routingNodes(); List shardCancellationActions = new ArrayList<>(); for (Set shardBatch : shardBatches) { Set eligibleFetchShards = new HashSet<>(); Set ineligibleShards = new HashSet<>(); for (ShardRouting shard : shardBatch) { - if (shard.primary()) { - ineligibleShards.add(shard); - continue; - } - if (shard.initializing() == false) { - continue; - } - if (shard.relocatingNodeId() != null) { - continue; - } - - // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... - if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) { - continue; + shardMatched = false; + for (RoutingNode routingNode : routingNodes) { + if (routingNode.getByShardId(shard.shardId()) != null) { + ShardRouting shardFromRoutingNode = routingNode.getByShardId(shard.shardId()); + if (!shardFromRoutingNode.primary()){ + shardMatched = true; + if (shouldSkipFetchForRecovery(shardFromRoutingNode)) { + continue; + } + eligibleFetchShards.add(shardFromRoutingNode); + } + } + if (shardMatched){ + break; + } } - eligibleFetchShards.add(shard); } AsyncBatchShardFetch.FetchResult shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation); if (shardState.hasData() == false) { @@ -76,60 +76,11 @@ public void processExistingRecoveries(RoutingAllocation allocation, List failedNodeIds = shard.unassignedInfo() == null - ? Collections.emptySet() - : shard.unassignedInfo().getFailedNodeIds(); - UnassignedInfo unassignedInfo = new UnassignedInfo( - UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" - + currentNode - + "] cancelled, can perform a noop recovery on [" - + nodeWithHighestMatch - + "]", - null, - 0, - allocation.getCurrentNanoTime(), - System.currentTimeMillis(), - false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT, - failedNodeIds - ); - // don't cancel shard in the loop as it will cause a ConcurrentModificationException - shardCancellationActions.add( - () -> routingNodes.failShard( - logger, - shard, - unassignedInfo, - metadata.getIndexSafe(shard.index()), - allocation.changes() - ) - ); - } + Map nodeShardStores = getNodeShardStores(shard, shardState); + + Runnable cancellationAction = getShardCancellationAction(shard, allocation, nodeShardStores); + if (cancellationAction != null) { + shardCancellationActions.add(cancellationAction); } } } @@ -138,17 +89,16 @@ && canPerformOperationBasedRecovery(primaryStore, shardState, currentNode, shard } } - private static boolean isResponsibleFor(final ShardRouting shard) { - return !shard.primary() // must be a replica - && shard.unassigned() // must be unassigned - // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one... - && shard.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED; - } abstract protected FetchResult fetchData(Set shardEligibleForFetch, Set inEligibleShards, RoutingAllocation allocation); + @Override + protected FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { + return null; + } + @Override public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, RoutingAllocation allocation, Logger logger) { return null; @@ -188,6 +138,10 @@ public HashMap makeAllocationDecision( shardsEligibleForFetch.add(shard); } + // Do not call fetchData if there are no eligible shards + if (shardsEligibleForFetch.isEmpty()) { + return shardAllocationDecisions; + } // only fetch data for eligible shards final FetchResult shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation); @@ -204,285 +158,18 @@ public HashMap makeAllocationDecision( continue; } Tuple> result = nodeAllocationDecisions.get(unassignedShard); - ShardRouting primaryShard = routingNodes.activePrimary(unassignedShard.shardId()); - if (primaryShard == null) { - assert explain : "primary should only be null here if we are in explain mode, so we didn't " - + "exit early when canBeAllocatedToAtLeastOneNode didn't return a YES decision"; - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.no( - UnassignedInfo.AllocationStatus.fromDecision(result.v1().type()), - result.v2() != null ? new ArrayList<>(result.v2().values()) : null - )); - continue; - } - assert primaryShard.currentNodeId() != null; - final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore = findStore(primaryNode, shardsState, unassignedShard); - if (primaryStore == null) { - // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) - // we want to let the replica be allocated in order to expose the actual problem with the primary that the replica - // will try and recover from - // Note, this is the existing behavior, as exposed in running CorruptFileTest#testNoPrimaryData - logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", unassignedShard); - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.NOT_TAKEN); - continue; - } - - // find the matching nodes - ReplicaShardAllocator.MatchingNodes matchingNodes = findMatchingNodes( - unassignedShard, - allocation, - false, - primaryNode, - primaryStore, - shardsState, - explain - ); - - assert explain == false || matchingNodes.getNodeDecisions() != null : "in explain mode, we must have individual node decisions"; - - List nodeDecisions = ReplicaShardAllocator.augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.getNodeDecisions()); - if (result.v1().type() != Decision.Type.YES) { - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(result.v1().type()), nodeDecisions)); - continue; - } else if (matchingNodes.getNodeWithHighestMatch() != null) { - RoutingNode nodeWithHighestMatch = allocation.routingNodes().node(matchingNodes.getNodeWithHighestMatch().getId()); - // we only check on THROTTLE since we checked before on NO - Decision decision = allocation.deciders().canAllocate(unassignedShard, nodeWithHighestMatch, allocation); - if (decision.type() == Decision.Type.THROTTLE) { - logger.debug( - "[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - nodeWithHighestMatch.node() - ); - // we are throttling this, as we have enough other shards to allocate to this node, so ignore it for now - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.throttle(nodeDecisions)); - } else { - logger.debug( - "[{}][{}]: allocating [{}] to [{}] in order to reuse its unallocated persistent store", - unassignedShard.index(), - unassignedShard.id(), - unassignedShard, - nodeWithHighestMatch.node() - ); - // we found a match - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.yes(nodeWithHighestMatch.node(), null, nodeDecisions, true)); - } - continue; - } else if (matchingNodes.hasAnyData() == false && unassignedShard.unassignedInfo().isDelayed()) { - // if we didn't manage to find *any* data (regardless of matching sizes), and the replica is - // unassigned due to a node leaving, so we delay allocation of this replica to see if the - // node with the shard copy will rejoin so we can re-use the copy it has - logger.debug("{}: allocation of [{}] is delayed", unassignedShard.shardId(), unassignedShard); - long remainingDelayMillis = 0L; - long totalDelayMillis = 0L; - if (explain) { - UnassignedInfo unassignedInfo = unassignedShard.unassignedInfo(); - Metadata metadata = allocation.metadata(); - IndexMetadata indexMetadata = metadata.index(unassignedShard.index()); - totalDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetadata.getSettings()).getMillis(); - long remainingDelayNanos = unassignedInfo.getRemainingDelay(System.nanoTime(), indexMetadata.getSettings()); - remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis(); - } - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.delayed(remainingDelayMillis, totalDelayMillis, nodeDecisions)); - continue; - } - - shardAllocationDecisions.put(unassignedShard, AllocateUnassignedDecision.NOT_TAKEN); + shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, getNodeShardStores(unassignedShard, shardsState), result, logger)); } return shardAllocationDecisions; } - private ReplicaShardAllocator.MatchingNodes findMatchingNodes( - ShardRouting shard, - RoutingAllocation allocation, - boolean noMatchFailedNodes, - DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, - FetchResult data, - boolean explain - ) { - Map matchingNodes = new HashMap<>(); - Map nodeDecisions = explain ? new HashMap<>() : null; - for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { - DiscoveryNode discoNode = nodeStoreEntry.getKey(); - if (noMatchFailedNodes - && shard.unassignedInfo() != null - && shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { - continue; - } - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata storeFilesMetadata = nodeStoreEntry.getValue() - .getNodeStoreFilesMetadataBatch().get(shard.shardId()).storeFilesMetadata(); - // we don't have any files at all, it is an empty index - if (storeFilesMetadata.isEmpty()) { - continue; - } - - RoutingNode node = allocation.routingNodes().node(discoNode.getId()); - if (node == null) { - continue; - } - - // check if we can allocate on that node... - // we only check for NO, since if this node is THROTTLING and it has enough "same data" - // then we will try and assign it next time - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - ReplicaShardAllocator.MatchingNode matchingNode = null; - if (explain) { - matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata); - NodeAllocationResult.ShardStoreInfo shardStoreInfo = new NodeAllocationResult.ShardStoreInfo(matchingNode.matchingBytes); - nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision)); - } - - if (decision.type() == Decision.Type.NO) { - continue; - } - - if (matchingNode == null) { - matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetadata); - } - matchingNodes.put(discoNode, matchingNode); - if (logger.isTraceEnabled()) { - if (matchingNode.isNoopRecovery) { - logger.trace("{}: node [{}] can perform a noop recovery", shard, discoNode.getName()); - } else if (matchingNode.retainingSeqNo >= 0) { - logger.trace( - "{}: node [{}] can perform operation-based recovery with retaining sequence number [{}]", - shard, - discoNode.getName(), - matchingNode.retainingSeqNo - ); - } else { - logger.trace( - "{}: node [{}] has [{}/{}] bytes of re-usable data", - shard, - discoNode.getName(), - new ByteSizeValue(matchingNode.matchingBytes), - matchingNode.matchingBytes - ); - } - } - } - - return new ReplicaShardAllocator.MatchingNodes(matchingNodes, nodeDecisions); - } - - private static ReplicaShardAllocator.MatchingNode computeMatchingNode( - DiscoveryNode primaryNode, - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, - DiscoveryNode replicaNode, - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata replicaStore - ) { - final long retainingSeqNoForPrimary = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(primaryNode); - final long retainingSeqNoForReplica = primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(replicaNode); - final boolean isNoopRecovery = (retainingSeqNoForReplica >= retainingSeqNoForPrimary && retainingSeqNoForPrimary >= 0) - || hasMatchingSyncId(primaryStore, replicaStore); - final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore); - return new ReplicaShardAllocator.MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery); - } - - private static boolean hasMatchingSyncId( - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata replicaStore - ) { - String primarySyncId = primaryStore.syncId(); - return primarySyncId != null && primarySyncId.equals(replicaStore.syncId()); - } - - private static long computeMatchingBytes( - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata storeFilesMetadata - ) { - long sizeMatched = 0; - for (StoreFileMetadata storeFileMetadata : storeFilesMetadata) { - String metadataFileName = storeFileMetadata.name(); - if (primaryStore.fileExists(metadataFileName) && primaryStore.file(metadataFileName).isSame(storeFileMetadata)) { - sizeMatched += storeFileMetadata.length(); - } - } - return sizeMatched; - } - - /** - * Determines if the shard can be allocated on at least one node based on the allocation deciders. - *

- * Returns the best allocation decision for allocating the shard on any node (i.e. YES if at least one - * node decided YES, THROTTLE if at least one node decided THROTTLE, and NO if none of the nodes decided - * YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element - * in the returned tuple. - */ - private static Tuple> canBeAllocatedToAtLeastOneNode( - ShardRouting shard, - RoutingAllocation allocation - ) { - Decision madeDecision = Decision.NO; - final boolean explain = allocation.debugDecision(); - Map nodeDecisions = explain ? new HashMap<>() : null; - for (final DiscoveryNode cursor : allocation.nodes().getDataNodes().values()) { - RoutingNode node = allocation.routingNodes().node(cursor.getId()); - if (node == null) { - continue; - } - // if we can't allocate it on a node, ignore it, for example, this handles - // cases for only allocating a replica after a primary - Decision decision = allocation.deciders().canAllocate(shard, node, allocation); - if (decision.type() == Decision.Type.YES && madeDecision.type() != Decision.Type.YES) { - if (explain) { - madeDecision = decision; - } else { - return Tuple.tuple(decision, null); - } - } else if (madeDecision.type() == Decision.Type.NO && decision.type() == Decision.Type.THROTTLE) { - madeDecision = decision; - } - if (explain) { - nodeDecisions.put(node.nodeId(), new NodeAllocationResult(node.node(), null, decision)); - } - } - return Tuple.tuple(madeDecision, nodeDecisions); - } - - protected abstract boolean hasInitiatedFetching(ShardRouting shard); - - private static TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata findStore( - DiscoveryNode node, - FetchResult data, - ShardRouting shard - ) { - NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(node); - if (nodeFilesStore == null) { - return null; - } - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeFileStoreMetadata = nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId()); - if (nodeFileStoreMetadata.getStoreFileFetchException() != null) { - // Do we need to throw an exception here, to handle this case differently? - return null; - } - return nodeFileStoreMetadata.storeFilesMetadata(); - } - - private static boolean canPerformOperationBasedRecovery( - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore, - FetchResult data, - DiscoveryNode targetNode, - ShardRouting shard - ) { - final NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(targetNode); - if (nodeFilesStore == null) { - return false; - } - TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata nodeFileStoreMetadata = nodeFilesStore.getNodeStoreFilesMetadataBatch().get(shard.shardId()); - if (nodeFileStoreMetadata.getStoreFileFetchException() != null) { - return false; - } - TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata targetNodeStore = nodeFileStoreMetadata.storeFilesMetadata(); - if (targetNodeStore == null || targetNodeStore.isEmpty()) { - return false; - } - if (hasMatchingSyncId(primaryStore, targetNodeStore)) { - return true; - } - return primaryStore.getPeerRecoveryRetentionLeaseRetainingSeqNo(targetNode) >= 0; + private Map getNodeShardStores(ShardRouting unassignedShard, FetchResult data) { + assert data.hasData(); + return new HashMap<>( + data.getData().entrySet().stream().collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().getNodeStoreFilesMetadataBatch().get(unassignedShard.shardId()).storeFilesMetadata() + )) + ); } } diff --git a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java index 0999dd9f8467b..8563869682d89 100644 --- a/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java +++ b/server/src/test/java/org/opensearch/gateway/ReplicaShardBatchAllocatorTest.java @@ -44,6 +44,7 @@ import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.store.StoreFilesMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; import org.opensearch.snapshots.SnapshotShardSizeInfo; @@ -112,7 +113,7 @@ public void testAsyncFetchWithNoShardOnIndexCreation() { ); testBatchAllocator.clean(); allocateAllUnassignedBatch(allocation); - assertThat(testBatchAllocator.getFetchDataCalledAndClean(), equalTo(true)); + assertThat(testBatchAllocator.getFetchDataCalledAndClean(), equalTo(false)); assertThat(testBatchAllocator.getShardEligibleFetchDataCountAndClean(), equalTo(0)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); @@ -706,7 +707,7 @@ public TestBatchAllocator addData( data.put( node, new TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadata( - new TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata( + new StoreFilesMetadata( shardId, new Store.MetadataSnapshot(unmodifiableMap(filesAsMap), unmodifiableMap(commitData), randomInt()), peerRecoveryRetentionLeases