Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Oct 1, 2019
1 parent 13bb82b commit 17bfb34
Show file tree
Hide file tree
Showing 3 changed files with 284 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData;
Expand All @@ -47,15 +48,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BooleanSupplier;

import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;

public abstract class ReplicaShardAllocator extends BaseGatewayShardAllocator {
/**
* Process existing recoveries of replicas and see if we need to cancel them if we find a better
* match. Today, a better match is one that has full sync id match or peer recovery retention lease
* compared to not having one in the previous recovery.
* match. Today, a better match is one that can perform a no-op recovery while the previous recovery
* has to copy segment files.
*/
public void processExistingRecoveries(RoutingAllocation allocation) {
MetaData metaData = allocation.metaData();
Expand Down Expand Up @@ -86,40 +86,30 @@ public void processExistingRecoveries(RoutingAllocation allocation) {

ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore =
primaryNode != null ? findStore(primaryNode, shardStores) : null;
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
logger.trace("{}: no primary shard store found or allocated, letting actual allocation figure it out", shard);
continue;
}

MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryStore, shardStores, false);
MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false);
if (matchingNodes.getNodeWithHighestMatch() != null) {
DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId());
DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch();
BooleanSupplier currentNodeCanSkipPhase1 = () -> {
TransportNodesListShardStoreMetaData.StoreFilesMetaData storeMetadata =
shardStores.getData().get(currentNode).storeFilesMetaData();
// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
if (storeMetadata == null) {
return false;
}
IndexMetaData indexMetadata = allocation.metaData().index(shard.index());
if (canPerformOperationBasedRecovery(indexMetadata, primaryStore, currentNode, storeMetadata)) {
return true;
}
final String currentSyncId = storeMetadata.syncId();
return currentSyncId != null && currentSyncId.equals(primaryStore.syncId());
};

// current node will not be in matchingNodes as it is filtered away by SameShardAllocationDecider
if (currentNode.equals(nodeWithHighestMatch) == false
&& matchingNodes.canSkipPhase1(nodeWithHighestMatch) && currentNodeCanSkipPhase1.getAsBoolean() == false) {
// we found a better match that can skip phase 1, cancel the existing allocation.
logger.debug("cancelling allocation of replica on [{}], sync id match found on node [{}]",
&& matchingNodes.canPerformNoopRecovery(nodeWithHighestMatch)
&& canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == false) {
// we found a better match that can perform noop recovery, cancel the existing allocation.
logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]",
currentNode, nodeWithHighestMatch);
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node ["+
"existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+
nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false,
UnassignedInfo.AllocationStatus.NO_ATTEMPT);
Expand Down Expand Up @@ -186,8 +176,10 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
return AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocateDecision.type()),
new ArrayList<>(result.v2().values()));
}

TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore = findStore(primaryShard, allocation, shardStores);
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore =
primaryNode != null ? findStore(primaryNode, shardStores) : null;
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// we want to let the replica be allocated in order to expose the actual problem with the primary that the replica
Expand All @@ -197,7 +189,7 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas
return AllocateUnassignedDecision.NOT_TAKEN;
}

MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryStore, shardStores, explain);
MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain);
assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions";

List<NodeAllocationResult> nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions);
Expand Down Expand Up @@ -299,22 +291,17 @@ private static List<NodeAllocationResult> augmentExplanationsWithStoreInfo(Map<S
/**
* Finds the store for the assigned shard in the fetched data, returns null if none is found.
*/
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(ShardRouting shard, RoutingAllocation allocation,
private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore(DiscoveryNode node,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data) {
assert shard.currentNodeId() != null;
DiscoveryNode primaryNode = allocation.nodes().get(shard.currentNodeId());
if (primaryNode == null) {
NodeStoreFilesMetaData nodeFilesStore = data.getData().get(node);
if (nodeFilesStore == null) {
return null;
}
NodeStoreFilesMetaData primaryNodeFilesStore = data.getData().get(primaryNode);
if (primaryNodeFilesStore == null) {
return null;
}
return primaryNodeFilesStore.storeFilesMetaData();
return nodeFilesStore.storeFilesMetaData();
}

private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation,
TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> data,
boolean explain) {
Map<DiscoveryNode, MatchingNode> matchingNodes = new HashMap<>();
Expand All @@ -336,12 +323,9 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
// we only check for NO, since if this node is THROTTLING and it has enough "same data"
// then we will try and assign it next time
Decision decision = allocation.deciders().canAllocate(shard, node, allocation);
final IndexMetaData indexMetaData = allocation.metaData().index(primaryStore.shardId().getIndex());

MatchingNode matchingNode = null;
if (explain) {
matchingNode = new MatchingNode(computeMatchingBytes(primaryStore, storeFilesMetaData),
canPerformOperationBasedRecovery(indexMetaData, primaryStore, discoNode, storeFilesMetaData));
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData);
ShardStoreInfo shardStoreInfo = new ShardStoreInfo(matchingNode.matchingBytes);
nodeDecisions.put(node.nodeId(), new NodeAllocationResult(discoNode, shardStoreInfo, decision));
}
Expand All @@ -351,14 +335,13 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al
}

if (matchingNode == null) {
matchingNode = new MatchingNode(computeMatchingBytes(primaryStore, storeFilesMetaData),
canPerformOperationBasedRecovery(indexMetaData, primaryStore, discoNode, storeFilesMetaData));
matchingNode = computeMatchingNode(primaryNode, primaryStore, discoNode, storeFilesMetaData);
}
matchingNodes.put(discoNode, matchingNode);
if (logger.isTraceEnabled()) {
if (matchingNode.matchingBytes == Long.MAX_VALUE) {
logger.trace("{}: node [{}] has same sync id {} as primary", shard, discoNode.getName(), storeFilesMetaData.syncId());
} else if (matchingNode.operationBasedRecovery){
} else if (matchingNode.matchingOperations > 0){
logger.trace("{}: node [{}] can perform operation-based recovery", shard, discoNode.getName());
} else {
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
Expand Down Expand Up @@ -389,41 +372,68 @@ private static long computeMatchingBytes(TransportNodesListShardStoreMetaData.St
}
}

private static boolean canPerformOperationBasedRecovery(
IndexMetaData indexMetaData, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
private static long getRetainingSeqNoForNode(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, DiscoveryNode node) {
final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId());
return primaryStore.peerRecoveryRetentionLeases().stream()
.filter(lease -> lease.id().equals(retentionLeaseId))
.mapToLong(RetentionLease::retainingSequenceNumber).findFirst().orElse(0L);
}

private static MatchingNode computeMatchingNode(
DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
DiscoveryNode replicaNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData replicaStore) {
if (replicaStore.isEmpty()) {
// a corrupted store - it won't be able to perform operation-based recovery
return false;
return new MatchingNode(0, 0, false); // store is corrupted
}
// If an index is closed or frozen, we can perform an operation-based recovery only if the last commit on the replica is safe and
// has the same operations as the last commit on the primary. Here we must ignore the peer recovery retention lease as we don't
// have the persisted global checkpoint from the replica to determine if the last commit is safe. However, the likelihood that
// the last commit unsafe is very small. It's probably okay to use the retention lease if the sequence numbers of the last commit
// from the primary and replica equal.
if (indexMetaData.getState() == IndexMetaData.State.CLOSE ||
IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexMetaData.getSettings())) {
final long retainingSeqNoForPrimary = getRetainingSeqNoForNode(primaryStore, primaryNode);
final long retainingSeqNoForReplica = getRetainingSeqNoForNode(primaryStore, replicaNode);
final long matchingBytes = computeMatchingBytes(primaryStore, replicaStore);
final boolean isNoopRecovery = matchingBytes == Long.MAX_VALUE
|| (retainingSeqNoForReplica > 0 && retainingSeqNoForReplica == retainingSeqNoForPrimary);
return new MatchingNode(matchingBytes, retainingSeqNoForReplica, isNoopRecovery);
}

private static boolean canPerformOperationBasedRecovery(TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore,
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> shardStores,
DiscoveryNode targetNode) {
final NodeStoreFilesMetaData targetNodeStore = shardStores.getData().get(targetNode);
if (targetNodeStore == null || targetNodeStore.storeFilesMetaData().isEmpty()) {
return false;
}
final String retentionLeaseId = ReplicationTracker.getPeerRecoveryRetentionLeaseId(replicaNode.getId());
return primaryStore.peerRecoveryRetentionLeases().stream().anyMatch(lease -> lease.id().equals(retentionLeaseId));
final String primarySyncId = primaryStore.syncId();
if (primarySyncId != null && primarySyncId.equals(targetNodeStore.storeFilesMetaData().syncId())) {
return true;
}
return getRetainingSeqNoForNode(primaryStore, targetNode) > 0;
}


protected abstract AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation);

/**
* Returns a boolean indicating whether fetching shard data has been triggered at any point for the given shard.
*/
protected abstract boolean hasInitiatedFetching(ShardRouting shard);

private static class MatchingNode {
private static class MatchingNode implements Comparable<MatchingNode> {
final long matchingBytes;
final boolean operationBasedRecovery;
final long matchingOperations;
final boolean isNoopRecovery;

MatchingNode(long matchingBytes, boolean operationBasedRecovery) {
MatchingNode(long matchingBytes, long matchingOperations, boolean isNoopRecovery) {
this.matchingBytes = matchingBytes;
this.operationBasedRecovery = operationBasedRecovery;
this.matchingOperations = matchingOperations;
this.isNoopRecovery = isNoopRecovery;
}

@Override
public int compareTo(MatchingNode that) {
if (this.isNoopRecovery != that.isNoopRecovery) {
return Boolean.compare(this.isNoopRecovery, that.isNoopRecovery);
}
if (this.matchingOperations != that.matchingOperations) {
return Long.compare(this.matchingOperations, that.matchingOperations);
}
return Long.compare(this.matchingBytes, that.matchingBytes);
}
}

Expand All @@ -437,13 +447,10 @@ static class MatchingNodes {
this.matchingNodes = matchingNodes;
this.nodeDecisions = nodeDecisions;

MatchingNode highestMatchValue = new MatchingNode(0, false);
MatchingNode highestMatchValue = new MatchingNode(0, 0, false);
DiscoveryNode highestMatchNode = null;

for (Map.Entry<DiscoveryNode, MatchingNode> entry : matchingNodes.entrySet()) {
if (highestMatchValue.operationBasedRecovery == false && entry.getValue().operationBasedRecovery ||
(highestMatchValue.operationBasedRecovery == entry.getValue().operationBasedRecovery &&
highestMatchValue.matchingBytes < entry.getValue().matchingBytes)){
if (highestMatchValue.compareTo(entry.getValue()) < 0) {
highestMatchValue = entry.getValue();
highestMatchNode = entry.getKey();
}
Expand All @@ -460,9 +467,9 @@ public DiscoveryNode getNodeWithHighestMatch() {
return this.nodeWithHighestMatch;
}

boolean canSkipPhase1(DiscoveryNode node) {
boolean canPerformNoopRecovery(DiscoveryNode node) {
final MatchingNode matchingNode = matchingNodes.get(node);
return matchingNode.matchingBytes == Long.MAX_VALUE || matchingNode.operationBasedRecovery;
return matchingNode.isNoopRecovery;
}

/**
Expand Down
Loading

0 comments on commit 17bfb34

Please sign in to comment.