From 81a65fd88253ec3160b424dd21de554ad77a6bbb Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Fri, 1 Nov 2019 09:23:46 -0400 Subject: [PATCH] Do not cancel recovery for copy on broken node (#48265) This change fixes a poisonous situation where an ongoing recovery was canceled because a better copy was found on a node that the cluster had previously tried allocating the shard to but failed. The solution is to keep track of the set of nodes that an allocation was failed on so that we can avoid canceling the current recovery for a copy on failed nodes. Closes #47974 --- .../cluster/routing/RoutingNodes.java | 4 +- .../cluster/routing/UnassignedInfo.java | 47 ++++++++++- .../routing/allocation/AllocationService.java | 21 +++-- .../allocator/BalancedShardsAllocator.java | 3 +- ...AllocateEmptyPrimaryAllocationCommand.java | 3 +- .../gateway/ReplicaShardAllocator.java | 17 +++- .../cluster/routing/UnassignedInfoTests.java | 25 ++++-- .../TrackFailedAllocationNodesTests.java | 81 +++++++++++++++++++ ...storeInProgressAllocationDeciderTests.java | 4 +- .../gateway/ReplicaShardAllocatorIT.java | 41 ++++++++++ .../gateway/ReplicaShardAllocatorTests.java | 68 +++++++++++++--- .../indices/recovery/IndexRecoveryIT.java | 1 - 12 files changed, 281 insertions(+), 34 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 4750476805d88..e286a1f52ae19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -547,7 +547,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas"; UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT); + unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet()); failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver); } } @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus); + allocationStatus, currInfo.getFailedNodeIds()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index de0ef52d7b78c..41cc052501021 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -39,8 +40,11 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Set; /** * Holds additional information as to why the shard is in unassigned state. @@ -214,6 +218,7 @@ public String value() { private final String message; private final Exception failure; private final int failedAllocations; + private final Set failedNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard /** @@ -224,7 +229,7 @@ public String value() { **/ public UnassignedInfo(Reason reason, String message) { this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT); + AllocationStatus.NO_ATTEMPT, Collections.emptySet()); } /** @@ -235,9 +240,11 @@ public UnassignedInfo(Reason reason, String message) { * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard */ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, - long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus) { + long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, + Set failedNodeIds) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -246,6 +253,7 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failure = failure; this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); + this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds); assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; @@ -263,6 +271,11 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failure = in.readException(); this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_7_5_0)) { + this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); + } else { + this.failedNodeIds = Collections.emptySet(); + } } public void writeTo(StreamOutput out) throws IOException { @@ -280,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeException(failure); out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_5_0)) { + out.writeCollection(failedNodeIds, StreamOutput::writeString); + } } /** @@ -354,6 +370,19 @@ public AllocationStatus getLastAllocationStatus() { return lastAllocationStatus; } + /** + * A set of nodeIds that failed to complete allocations for this shard. {@link org.elasticsearch.gateway.ReplicaShardAllocator} + * uses this set to avoid repeatedly canceling ongoing recoveries for copies on those nodes although they can perform noop recoveries. + * This set will be discarded when a shard moves to started. And if a shard is failed while started (i.e., from started to unassigned), + * the currently assigned node won't be added to this set. + * + * @see org.elasticsearch.gateway.ReplicaShardAllocator#processExistingRecoveries(RoutingAllocation) + * @see org.elasticsearch.cluster.routing.allocation.AllocationService#applyFailedShards(ClusterState, List, List) + */ + public Set getFailedNodeIds() { + return failedNodeIds; + } + /** * Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings. * Only relevant if shard is effectively delayed (see {@link #isDelayed()}) @@ -410,6 +439,9 @@ public String shortSummary() { if (failedAllocations > 0) { sb.append(", failed_attempts[").append(failedAllocations).append("]"); } + if (failedNodeIds.isEmpty() == false) { + sb.append(", failed_nodes[").append(failedNodeIds).append("]"); + } sb.append(", delayed=").append(delayed); String details = getDetails(); @@ -433,6 +465,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (failedAllocations > 0) { builder.field("failed_attempts", failedAllocations); } + if (failedNodeIds.isEmpty() == false) { + builder.field("failed_nodes", failedNodeIds); + } builder.field("delayed", delayed); String details = getDetails(); if (details != null) { @@ -466,13 +501,16 @@ public boolean equals(Object o) { if (reason != that.reason) { return false; } - if (message != null ? !message.equals(that.message) : that.message != null) { + if (Objects.equals(message, that.message) == false) { return false; } if (lastAllocationStatus != that.lastAllocationStatus) { return false; } - return !(failure != null ? !failure.equals(that.failure) : that.failure != null); + if (Objects.equals(failure, that.failure) == false) { + return false; + } + return failedNodeIds.equals(that.failedNodeIds); } @Override @@ -484,6 +522,7 @@ public int hashCode() { result = 31 * result + (message != null ? message.hashCode() : 0); result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); + result = 31 * result + failedNodeIds.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index b238946e0f666..0565fdf43b493 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -45,9 +45,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -193,10 +195,18 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis shardToFail.shardId(), shardToFail, failedShard); } int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; + final Set failedNodeIds; + if (failedShard.unassignedInfo() != null) { + failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1); + failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds()); + failedNodeIds.add(failedShard.currentNodeId()); + } else { + failedNodeIds = Collections.emptySet(); + } String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT); + AllocationStatus.NO_ATTEMPT, failedNodeIds); if (failedShardEntry.markAsStale()) { allocation.removeAllocationId(failedShard); } @@ -288,8 +298,8 @@ private void removeDelayMarkers(RoutingAllocation allocation) { if (newComputedLeftDelayNanos == 0) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()), - shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } } @@ -307,7 +317,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus()), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); } } @@ -416,7 +426,8 @@ private void disassociateDeadNodes(RoutingAllocation allocation) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT); + null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, + Collections.emptySet()); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index a1549c5e217a4..5bacf6602b1c0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -152,7 +152,8 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO), + unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index 2e3219e67c7ae..08f64407f6d09 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; +import java.util.Collections; import java.util.Optional; /** @@ -139,7 +140,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) ", " + shardRouting.unassignedInfo().getMessage(); unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, - shardRouting.unassignedInfo().getLastAllocationStatus()); + shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet()); } initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2eda9da8e1086..03bc84477e95b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -43,10 +43,12 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -95,7 +97,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false); + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false); if (matchingNodes.getNodeWithHighestMatch() != null) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); @@ -106,11 +108,13 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f // we found a better match that can perform noop recovery, cancel the existing allocation. logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", currentNode, nodeWithHighestMatch); + final Set failedNodeIds = + shard.unassignedInfo() == null ? Collections.emptySet() : shard.unassignedInfo().getFailedNodeIds(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); @@ -186,7 +190,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.NOT_TAKEN; } - MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain); + MatchingNodes matchingNodes = findMatchingNodes( + unassignedShard, allocation, false, primaryNode, primaryStore, shardStores, explain); assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); @@ -297,7 +302,7 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore return nodeFilesStore.storeFilesMetaData(); } - private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, + private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { @@ -305,6 +310,10 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); + if (noMatchFailedNodes && shard.unassignedInfo() != null && + shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { + continue; + } TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); // we don't have any files at all, it is an empty index if (storeFilesMetaData.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 7c43729a7cd25..399bd72ae0118 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.ByteBufferStreamInput; import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; @@ -45,6 +46,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; @@ -81,9 +85,12 @@ public void testReasonOrdinalOrder() { public void testSerialization() throws Exception { UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); + int failedAllocations = randomIntBetween(1, 100); + Set failedNodes = IntStream.range(0, between(0, failedAllocations)) + .mapToObj(n -> "failed-node-" + n).collect(Collectors.toSet()); UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, - randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): + failedAllocations, System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, failedNodes): new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); @@ -95,17 +102,25 @@ public void testSerialization() throws Exception { assertThat(read.getMessage(), equalTo(meta.getMessage())); assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); + assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); } public void testBwcSerialization() throws Exception { final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CLOSED, "message"); BytesStreamOutput out = new BytesStreamOutput(); - out.setVersion(VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, VersionUtils.getPreviousVersion(Version.V_7_0_0))); + Version version = VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.CURRENT); + out.setVersion(version); unassignedInfo.writeTo(out); out.close(); - UnassignedInfo read = new UnassignedInfo(out.bytes().streamInput()); - assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED)); + StreamInput in = out.bytes().streamInput(); + in.setVersion(version); + UnassignedInfo read = new UnassignedInfo(in); + if (version.before(Version.V_7_0_0)) { + assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.REINITIALIZED)); + } else { + assertThat(read.getReason(), equalTo(UnassignedInfo.Reason.INDEX_CLOSED)); + } assertThat(read.getUnassignedTimeInMillis(), equalTo(unassignedInfo.getUnassignedTimeInMillis())); assertThat(read.getMessage(), equalTo(unassignedInfo.getMessage())); assertThat(read.getDetails(), equalTo(unassignedInfo.getDetails())); @@ -312,7 +327,7 @@ public void testFailedShard() { public void testRemainingDelayCalculation() throws Exception { final long baseTime = System.nanoTime(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, - System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT); + System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, Collections.emptySet()); final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); final Settings indexSettings = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java new file mode 100644 index 0000000000000..829b7ef1c9713 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; + +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +public class TrackFailedAllocationNodesTests extends ESAllocationTestCase { + + public void testTrackFailedNodes() { + int maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); + AllocationService allocationService = createAllocationService(); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + for (int i = 0; i < 5; i++) { + discoNodes.add(newNode("node-" + i)); + } + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(discoNodes) + .metaData(metaData).routingTable(RoutingTable.builder().addAsNew(metaData.index("idx")).build()) + .build(); + clusterState = allocationService.reroute(clusterState, "reroute"); + Set failedNodeIds = new HashSet<>(); + + // track the failed nodes if shard is not started + for (int i = 0; i < maxRetries; i++) { + failedNodeIds.add(clusterState.routingTable().index("idx").shard(0).shards().get(0).currentNodeId()); + clusterState = allocationService.applyFailedShard( + clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), randomBoolean()); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), + equalTo(failedNodeIds)); + } + + // reroute with retryFailed=true should discard the failedNodes + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.UNASSIGNED)); + clusterState = allocationService.reroute(clusterState, new AllocationCommands(), false, true).getClusterState(); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); + + // do not track the failed nodes while shard is started + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.STARTED)); + clusterState = allocationService.applyFailedShard( + clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), false); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index cfb0979a9ee40..fa4cba9a2c32c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -117,8 +117,8 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { UnassignedInfo currentInfo = primary.unassignedInfo(); UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), - currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), - currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus()); + currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), + currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index d27078b7e796b..63f4e225a1645 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -26,6 +26,8 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -282,6 +284,45 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception { assertThat(internalCluster().nodesInclude(indexName), allOf(hasItem(nodeWithHigherMatching), not(hasItem(nodeWithLowerMatching)))); } + /** + * Make sure that we do not repeatedly cancel an ongoing recovery for a noop copy on a broken node. + */ + public void testDoNotCancelRecoveryForBrokenNode() throws Exception { + internalCluster().startMasterOnlyNode(); + String nodeWithPrimary = internalCluster().startDataOnlyNode(); + String indexName = "test"; + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + String brokenNode = internalCluster().startDataOnlyNode(); + MockTransportService transportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + CountDownLatch newNodeStarted = new CountDownLatch(1); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) { + if (brokenNode.equals(connection.getNode().getName())) { + try { + newNodeStarted.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + throw new CircuitBreakingException("not enough memory for indexing", 100, 50, CircuitBreaker.Durability.TRANSIENT); + } + } + connection.sendRequest(requestId, action, request, options); + }); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().startDataOnlyNode(); + newNodeStarted.countDown(); + ensureGreen(indexName); + transportService.clearAllRules(); + } + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { assertBusy(() -> { Index index = resolveIndex(indexName); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 821e721a1ac27..6947250b9bbbe 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -61,12 +61,16 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class ReplicaShardAllocatorTests extends ESAllocationTestCase { private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT @@ -182,7 +186,17 @@ public void testPreferCopyWithHighestMatchingOperations() { } public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { - RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + final UnassignedInfo unassignedInfo; + final Set failedNodes; + if (randomBoolean()) { + failedNodes = Collections.emptySet(); + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); + } else { + failedNodes = new HashSet<>(randomSubsetOf(Arrays.asList("node-4", "node-5", "node-6"))); + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes); + } + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); testAllocator.addData(node1, Arrays.asList(newRetentionLease(node1, retainingSeqNo), newRetentionLease(node3, retainingSeqNo)), "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); @@ -190,8 +204,11 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.processExistingRecoveries(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + List unassignedShards = allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED); + assertThat(unassignedShards, hasSize(1)); + assertThat(unassignedShards.get(0).shardId(), equalTo(shardId)); + assertThat(unassignedShards.get(0).unassignedInfo().getNumFailedAllocations(), equalTo(0)); + assertThat(unassignedShards.get(0).unassignedInfo().getFailedNodeIds(), equalTo(failedNodes)); } public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() { @@ -357,7 +374,15 @@ public void testCancelRecoveryBetterSyncId() { } public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { - RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + final UnassignedInfo unassignedInfo; + if (randomBoolean()) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.singleton("node-4")); + } + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); List retentionLeases = new ArrayList<>(); if (randomBoolean()) { long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); @@ -388,6 +413,28 @@ public void testNotCancellingRecovery() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); } + public void testDoNotCancelForBrokenNode() { + Set failedNodes = new HashSet<>(); + failedNodes.add(node3.getId()); + if (randomBoolean()) { + failedNodes.add("node4"); + } + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, + randomIntBetween(failedNodes.size(), 10), System.nanoTime(), System.currentTimeMillis(), false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes); + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); + long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); + List retentionLeases = Arrays.asList( + newRetentionLease(node1, retainingSeqNoOnPrimary), newRetentionLease(node3, retainingSeqNoOnPrimary)); + testAllocator + .addData(node1, retentionLeases, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty()); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED); } @@ -410,8 +457,8 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide .addShard(ShardRouting.newUnassigned(shardId, false, RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(), - System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT) - )) + System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet()))) .build()) ) .build(); @@ -422,7 +469,7 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } - private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders, UnassignedInfo unassignedInfo) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT)) @@ -434,8 +481,7 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid .addIndexShard(new IndexShardRoutingTable.Builder(shardId) .addShard(primaryShard) .addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false, - ShardRoutingState.INITIALIZING, - new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null))) + ShardRoutingState.INITIALIZING, unassignedInfo)) .build()) ) .build(); @@ -446,6 +492,10 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + return onePrimaryOnNode1And1ReplicaRecovering(deciders, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); + } + static RetentionLease newRetentionLease(DiscoveryNode node, long retainingSeqNo) { return new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()), retainingSeqNo, randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 33cdaa55e40b0..5ebf84a8911ce 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -997,7 +997,6 @@ public void testHistoryRetention() throws Exception { assertThat(recoveryStates.get(0).getTranslog().recoveredOperations(), greaterThan(0)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47974") public void testDoNotInfinitelyWaitForMapping() { internalCluster().ensureAtLeastNumDataNodes(3); createIndex("test", Settings.builder()