diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 4750476805d88..e286a1f52ae19 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -547,7 +547,7 @@ assert getByAllocationId(failedShard.shardId(), failedShard.allocationId().getId assert replicaShard != null : "failed to re-resolve " + routing + " when failing replicas"; UnassignedInfo primaryFailedUnassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing", null, 0, unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT); + unassignedInfo.getUnassignedTimeInMillis(), false, AllocationStatus.NO_ATTEMPT, Collections.emptySet()); failShard(logger, replicaShard, primaryFailedUnassignedInfo, indexMetaData, routingChangesObserver); } } @@ -873,7 +873,7 @@ public void ignoreShard(ShardRouting shard, AllocationStatus allocationStatus, R UnassignedInfo newInfo = new UnassignedInfo(currInfo.getReason(), currInfo.getMessage(), currInfo.getFailure(), currInfo.getNumFailedAllocations(), currInfo.getUnassignedTimeInNanos(), currInfo.getUnassignedTimeInMillis(), currInfo.isDelayed(), - allocationStatus); + allocationStatus, currInfo.getFailedNodeIds()); ShardRouting updatedShard = shard.updateUnassigned(newInfo, shard.recoverySource()); changes.unassignedInfoUpdated(shard, newInfo); shard = updatedShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index eaeb6e0402076..7152f27d36ee5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -20,8 +20,10 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -38,8 +40,11 @@ import java.io.IOException; import java.time.Instant; import java.time.ZoneOffset; +import java.util.Collections; +import java.util.List; import java.util.Locale; import java.util.Objects; +import java.util.Set; /** * Holds additional information as to why the shard is in unassigned state. @@ -213,6 +218,7 @@ public String value() { private final String message; private final Exception failure; private final int failedAllocations; + private final Set failedNodeIds; private final AllocationStatus lastAllocationStatus; // result of the last allocation attempt for this shard /** @@ -223,7 +229,7 @@ public String value() { **/ public UnassignedInfo(Reason reason, String message) { this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT); + AllocationStatus.NO_ATTEMPT, Collections.emptySet()); } /** @@ -234,9 +240,11 @@ public UnassignedInfo(Reason reason, String message) { * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING. * @param lastAllocationStatus the result of the last allocation attempt for this shard + * @param failedNodeIds a set of nodeIds that failed to complete allocations for this shard */ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Exception failure, int failedAllocations, - long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus) { + long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed, AllocationStatus lastAllocationStatus, + Set failedNodeIds) { this.reason = Objects.requireNonNull(reason); this.unassignedTimeMillis = unassignedTimeMillis; this.unassignedTimeNanos = unassignedTimeNanos; @@ -245,6 +253,7 @@ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Excepti this.failure = failure; this.failedAllocations = failedAllocations; this.lastAllocationStatus = Objects.requireNonNull(lastAllocationStatus); + this.failedNodeIds = Collections.unmodifiableSet(failedNodeIds); assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) : "failedAllocations: " + failedAllocations + " for reason " + reason; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; @@ -262,6 +271,11 @@ public UnassignedInfo(StreamInput in) throws IOException { this.failure = in.readException(); this.failedAllocations = in.readVInt(); this.lastAllocationStatus = AllocationStatus.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + this.failedNodeIds = Collections.unmodifiableSet(in.readSet(StreamInput::readString)); + } else { + this.failedNodeIds = Collections.emptySet(); + } } public void writeTo(StreamOutput out) throws IOException { @@ -273,6 +287,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeException(failure); out.writeVInt(failedAllocations); lastAllocationStatus.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeCollection(failedNodeIds, StreamOutput::writeString); + } } /** @@ -347,6 +364,19 @@ public AllocationStatus getLastAllocationStatus() { return lastAllocationStatus; } + /** + * A set of nodeIds that failed to complete allocations for this shard. {@link org.elasticsearch.gateway.ReplicaShardAllocator} + * uses this set to avoid repeatedly canceling ongoing recoveries for copies on those nodes although they can perform noop recoveries. + * This set will be discarded when a shard moves to started. And if a shard is failed while started (i.e., from started to unassigned), + * the currently assigned node won't be added to this set. + * + * @see org.elasticsearch.gateway.ReplicaShardAllocator#processExistingRecoveries(RoutingAllocation) + * @see org.elasticsearch.cluster.routing.allocation.AllocationService#applyFailedShards(ClusterState, List, List) + */ + public Set getFailedNodeIds() { + return failedNodeIds; + } + /** * Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings. * Only relevant if shard is effectively delayed (see {@link #isDelayed()}) @@ -403,6 +433,9 @@ public String shortSummary() { if (failedAllocations > 0) { sb.append(", failed_attempts[").append(failedAllocations).append("]"); } + if (failedNodeIds.isEmpty() == false) { + sb.append(", failed_nodes[").append(failedNodeIds).append("]"); + } sb.append(", delayed=").append(delayed); String details = getDetails(); @@ -426,6 +459,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (failedAllocations > 0) { builder.field("failed_attempts", failedAllocations); } + if (failedNodeIds.isEmpty() == false) { + builder.field("failed_nodes", failedNodeIds); + } builder.field("delayed", delayed); String details = getDetails(); if (details != null) { @@ -459,13 +495,16 @@ public boolean equals(Object o) { if (reason != that.reason) { return false; } - if (message != null ? !message.equals(that.message) : that.message != null) { + if (Objects.equals(message, that.message) == false) { return false; } if (lastAllocationStatus != that.lastAllocationStatus) { return false; } - return !(failure != null ? !failure.equals(that.failure) : that.failure != null); + if (Objects.equals(failure, that.failure) == false) { + return false; + } + return failedNodeIds.equals(that.failedNodeIds); } @Override @@ -477,6 +516,7 @@ public int hashCode() { result = 31 * result + (message != null ? message.hashCode() : 0); result = 31 * result + (failure != null ? failure.hashCode() : 0); result = 31 * result + lastAllocationStatus.hashCode(); + result = 31 * result + failedNodeIds.hashCode(); return result; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f158103ed226c..b7c288b703ed1 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -46,9 +46,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -194,10 +196,18 @@ public ClusterState applyFailedShards(final ClusterState clusterState, final Lis shardToFail.shardId(), shardToFail, failedShard); } int failedAllocations = failedShard.unassignedInfo() != null ? failedShard.unassignedInfo().getNumFailedAllocations() : 0; + final Set failedNodeIds; + if (failedShard.unassignedInfo() != null) { + failedNodeIds = new HashSet<>(failedShard.unassignedInfo().getFailedNodeIds().size() + 1); + failedNodeIds.addAll(failedShard.unassignedInfo().getFailedNodeIds()); + failedNodeIds.add(failedShard.currentNodeId()); + } else { + failedNodeIds = Collections.emptySet(); + } String message = "failed shard on node [" + shardToFail.currentNodeId() + "]: " + failedShardEntry.getMessage(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, message, failedShardEntry.getFailure(), failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false, - AllocationStatus.NO_ATTEMPT); + AllocationStatus.NO_ATTEMPT, failedNodeIds); if (failedShardEntry.markAsStale()) { allocation.removeAllocationId(failedShard); } @@ -289,8 +299,8 @@ private void removeDelayMarkers(RoutingAllocation allocation) { if (newComputedLeftDelayNanos == 0) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus()), - shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getUnassignedTimeInMillis(), false, unassignedInfo.getLastAllocationStatus(), + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } } @@ -308,7 +318,7 @@ private void resetFailedAllocationCounter(RoutingAllocation allocation) { UnassignedInfo.Reason.MANUAL_ALLOCATION : unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), 0, unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), - unassignedInfo.getLastAllocationStatus()), shardRouting.recoverySource(), allocation.changes()); + unassignedInfo.getLastAllocationStatus(), Collections.emptySet()), shardRouting.recoverySource(), allocation.changes()); } } @@ -421,7 +431,8 @@ private void disassociateDeadNodes(RoutingAllocation allocation) { final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index()); boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0; UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]", - null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT); + null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT, + Collections.emptySet()); allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes()); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ba92b7a20455c..9a25c45ea355d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -152,7 +152,8 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { if (shardRouting.primary() && unassignedInfo.getLastAllocationStatus() == AllocationStatus.NO_ATTEMPT) { unassignedIterator.updateUnassigned(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(), unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), - unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO), + unassignedInfo.getUnassignedTimeInMillis(), unassignedInfo.isDelayed(), AllocationStatus.DECIDERS_NO, + unassignedInfo.getFailedNodeIds()), shardRouting.recoverySource(), allocation.changes()); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java index 2e3219e67c7ae..08f64407f6d09 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java @@ -38,6 +38,7 @@ import org.elasticsearch.index.shard.ShardNotFoundException; import java.io.IOException; +import java.util.Collections; import java.util.Optional; /** @@ -139,7 +140,7 @@ public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) ", " + shardRouting.unassignedInfo().getMessage(); unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.FORCED_EMPTY_PRIMARY, unassignedInfoMessage, shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false, - shardRouting.unassignedInfo().getLastAllocationStatus()); + shardRouting.unassignedInfo().getLastAllocationStatus(), Collections.emptySet()); } initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate, diff --git a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 2eda9da8e1086..03bc84477e95b 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -43,10 +43,12 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING; @@ -95,7 +97,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) { continue; } - MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, primaryNode, primaryStore, shardStores, false); + MatchingNodes matchingNodes = findMatchingNodes(shard, allocation, true, primaryNode, primaryStore, shardStores, false); if (matchingNodes.getNodeWithHighestMatch() != null) { DiscoveryNode currentNode = allocation.nodes().get(shard.currentNodeId()); DiscoveryNode nodeWithHighestMatch = matchingNodes.getNodeWithHighestMatch(); @@ -106,11 +108,13 @@ && canPerformOperationBasedRecovery(primaryStore, shardStores, currentNode) == f // we found a better match that can perform noop recovery, cancel the existing allocation. logger.debug("cancelling allocation of replica on [{}], can perform a noop recovery on node [{}]", currentNode, nodeWithHighestMatch); + final Set failedNodeIds = + shard.unassignedInfo() == null ? Collections.emptySet() : shard.unassignedInfo().getFailedNodeIds(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, "existing allocation of replica to [" + currentNode + "] cancelled, can perform a noop recovery on ["+ nodeWithHighestMatch + "]", null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false, - UnassignedInfo.AllocationStatus.NO_ATTEMPT); + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodeIds); // don't cancel shard in the loop as it will cause a ConcurrentModificationException shardCancellationActions.add(() -> routingNodes.failShard(logger, shard, unassignedInfo, metaData.getIndexSafe(shard.index()), allocation.changes())); @@ -186,7 +190,8 @@ public AllocateUnassignedDecision makeAllocationDecision(final ShardRouting unas return AllocateUnassignedDecision.NOT_TAKEN; } - MatchingNodes matchingNodes = findMatchingNodes(unassignedShard, allocation, primaryNode, primaryStore, shardStores, explain); + MatchingNodes matchingNodes = findMatchingNodes( + unassignedShard, allocation, false, primaryNode, primaryStore, shardStores, explain); assert explain == false || matchingNodes.nodeDecisions != null : "in explain mode, we must have individual node decisions"; List nodeDecisions = augmentExplanationsWithStoreInfo(result.v2(), matchingNodes.nodeDecisions); @@ -297,7 +302,7 @@ private static TransportNodesListShardStoreMetaData.StoreFilesMetaData findStore return nodeFilesStore.storeFilesMetaData(); } - private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, + private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation allocation, boolean noMatchFailedNodes, DiscoveryNode primaryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData primaryStore, AsyncShardFetch.FetchResult data, boolean explain) { @@ -305,6 +310,10 @@ private MatchingNodes findMatchingNodes(ShardRouting shard, RoutingAllocation al Map nodeDecisions = explain ? new HashMap<>() : null; for (Map.Entry nodeStoreEntry : data.getData().entrySet()) { DiscoveryNode discoNode = nodeStoreEntry.getKey(); + if (noMatchFailedNodes && shard.unassignedInfo() != null && + shard.unassignedInfo().getFailedNodeIds().contains(discoNode.getId())) { + continue; + } TransportNodesListShardStoreMetaData.StoreFilesMetaData storeFilesMetaData = nodeStoreEntry.getValue().storeFilesMetaData(); // we don't have any files at all, it is an empty index if (storeFilesMetaData.isEmpty()) { diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 20216af9bf38e..eb5b5c10c843c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -44,6 +44,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; @@ -80,9 +83,12 @@ public void testReasonOrdinalOrder() { public void testSerialization() throws Exception { UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values()); + int failedAllocations = randomIntBetween(1, 100); + Set failedNodes = IntStream.range(0, between(0, failedAllocations)) + .mapToObj(n -> "failed-node-" + n).collect(Collectors.toSet()); UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ? new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null, null, - randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT): + failedAllocations, System.nanoTime(), System.currentTimeMillis(), false, AllocationStatus.NO_ATTEMPT, failedNodes): new UnassignedInfo(reason, randomBoolean() ? randomAlphaOfLength(4) : null); BytesStreamOutput out = new BytesStreamOutput(); meta.writeTo(out); @@ -94,6 +100,7 @@ public void testSerialization() throws Exception { assertThat(read.getMessage(), equalTo(meta.getMessage())); assertThat(read.getDetails(), equalTo(meta.getDetails())); assertThat(read.getNumFailedAllocations(), equalTo(meta.getNumFailedAllocations())); + assertThat(read.getFailedNodeIds(), equalTo(meta.getFailedNodeIds())); } public void testIndexCreated() { @@ -296,7 +303,7 @@ public void testFailedShard() { public void testRemainingDelayCalculation() throws Exception { final long baseTime = System.nanoTime(); UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, - System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT); + System.currentTimeMillis(), randomBoolean(), AllocationStatus.NO_ATTEMPT, Collections.emptySet()); final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); final Settings indexSettings = Settings.builder() .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java new file mode 100644 index 0000000000000..829b7ef1c9713 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/TrackFailedAllocationNodesTests.java @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; +import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider; +import org.elasticsearch.common.settings.Settings; + +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; + +public class TrackFailedAllocationNodesTests extends ESAllocationTestCase { + + public void testTrackFailedNodes() { + int maxRetries = MaxRetryAllocationDecider.SETTING_ALLOCATION_MAX_RETRY.get(Settings.EMPTY); + AllocationService allocationService = createAllocationService(); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("idx").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0)) + .build(); + DiscoveryNodes.Builder discoNodes = DiscoveryNodes.builder(); + for (int i = 0; i < 5; i++) { + discoNodes.add(newNode("node-" + i)); + } + ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(discoNodes) + .metaData(metaData).routingTable(RoutingTable.builder().addAsNew(metaData.index("idx")).build()) + .build(); + clusterState = allocationService.reroute(clusterState, "reroute"); + Set failedNodeIds = new HashSet<>(); + + // track the failed nodes if shard is not started + for (int i = 0; i < maxRetries; i++) { + failedNodeIds.add(clusterState.routingTable().index("idx").shard(0).shards().get(0).currentNodeId()); + clusterState = allocationService.applyFailedShard( + clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), randomBoolean()); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), + equalTo(failedNodeIds)); + } + + // reroute with retryFailed=true should discard the failedNodes + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.UNASSIGNED)); + clusterState = allocationService.reroute(clusterState, new AllocationCommands(), false, true).getClusterState(); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); + + // do not track the failed nodes while shard is started + clusterState = startInitializingShardsAndReroute(allocationService, clusterState); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).state(), equalTo(ShardRoutingState.STARTED)); + clusterState = allocationService.applyFailedShard( + clusterState, clusterState.routingTable().index("idx").shard(0).shards().get(0), false); + assertThat(clusterState.routingTable().index("idx").shard(0).shards().get(0).unassignedInfo().getFailedNodeIds(), empty()); + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java index 8bb1657b5ef3a..fbedd5e1799c6 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/RestoreInProgressAllocationDeciderTests.java @@ -119,8 +119,8 @@ public void testCanAllocatePrimaryExistingInRestoreInProgress() { UnassignedInfo currentInfo = primary.unassignedInfo(); UnassignedInfo newInfo = new UnassignedInfo(currentInfo.getReason(), currentInfo.getMessage(), new IOException("i/o failure"), - currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), - currentInfo.getUnassignedTimeInMillis(), currentInfo.isDelayed(), currentInfo.getLastAllocationStatus()); + currentInfo.getNumFailedAllocations(), currentInfo.getUnassignedTimeInNanos(), currentInfo.getUnassignedTimeInMillis(), + currentInfo.isDelayed(), currentInfo.getLastAllocationStatus(), currentInfo.getFailedNodeIds()); primary = primary.updateUnassigned(newInfo, primary.recoverySource()); IndexRoutingTable indexRoutingTable = routingTable.index("test"); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java index 040ae9be98c30..554b75a3dc539 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorIT.java @@ -26,6 +26,8 @@ import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; @@ -282,6 +284,45 @@ public void testPreferCopyWithHighestMatchingOperations() throws Exception { assertThat(internalCluster().nodesInclude(indexName), allOf(hasItem(nodeWithHigherMatching), not(hasItem(nodeWithLowerMatching)))); } + /** + * Make sure that we do not repeatedly cancel an ongoing recovery for a noop copy on a broken node. + */ + public void testDoNotCancelRecoveryForBrokenNode() throws Exception { + internalCluster().startMasterOnlyNode(); + String nodeWithPrimary = internalCluster().startDataOnlyNode(); + String indexName = "test"; + assertAcked(client().admin().indices().prepareCreate(indexName).setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms") + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms"))); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, between(200, 500)) + .mapToObj(n -> client().prepareIndex(indexName).setSource("f", "v")).collect(Collectors.toList())); + client().admin().indices().prepareFlush(indexName).get(); + String brokenNode = internalCluster().startDataOnlyNode(); + MockTransportService transportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, nodeWithPrimary); + CountDownLatch newNodeStarted = new CountDownLatch(1); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.TRANSLOG_OPS)) { + if (brokenNode.equals(connection.getNode().getName())) { + try { + newNodeStarted.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + throw new CircuitBreakingException("not enough memory for indexing", 100, 50, CircuitBreaker.Durability.TRANSIENT); + } + } + connection.sendRequest(requestId, action, request, options); + }); + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))); + internalCluster().startDataOnlyNode(); + newNodeStarted.countDown(); + ensureGreen(indexName); + transportService.clearAllRules(); + } + private void ensureActivePeerRecoveryRetentionLeasesAdvanced(String indexName) throws Exception { assertBusy(() -> { Index index = resolveIndex(indexName); diff --git a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 821e721a1ac27..a91752a42580b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -61,12 +61,16 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.unmodifiableMap; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; public class ReplicaShardAllocatorTests extends ESAllocationTestCase { private static final org.apache.lucene.util.Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT @@ -182,7 +186,14 @@ public void testPreferCopyWithHighestMatchingOperations() { } public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { - RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + final UnassignedInfo unassignedInfo; + if (randomBoolean()) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, Set.of("node-4")); + } + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); long retainingSeqNo = randomLongBetween(1, Long.MAX_VALUE); testAllocator.addData(node1, Arrays.asList(newRetentionLease(node1, retainingSeqNo), newRetentionLease(node3, retainingSeqNo)), "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); @@ -190,8 +201,11 @@ public void testCancelRecoveryIfFoundCopyWithNoopRetentionLease() { testAllocator.addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); testAllocator.processExistingRecoveries(allocation); assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).get(0).shardId(), equalTo(shardId)); + List unassignedShards = allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED); + assertThat(unassignedShards, hasSize(1)); + assertThat(unassignedShards.get(0).shardId(), equalTo(shardId)); + assertThat(unassignedShards.get(0).unassignedInfo().getNumFailedAllocations(), equalTo(0)); + assertThat(unassignedShards.get(0).unassignedInfo().getFailedNodeIds(), empty()); } public void testNotCancellingRecoveryIfCurrentRecoveryHasRetentionLease() { @@ -357,7 +371,14 @@ public void testCancelRecoveryBetterSyncId() { } public void testNotCancellingRecoveryIfSyncedOnExistingRecovery() { - RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders()); + final UnassignedInfo unassignedInfo; + if (randomBoolean()) { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null); + } else { + unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, randomIntBetween(1, 10), + System.nanoTime(), System.currentTimeMillis(), false, UnassignedInfo.AllocationStatus.NO_ATTEMPT, Set.of("node-4")); + } + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); List retentionLeases = new ArrayList<>(); if (randomBoolean()) { long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); @@ -388,6 +409,28 @@ public void testNotCancellingRecovery() { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(0)); } + public void testDoNotCancelForBrokenNode() { + Set failedNodes = new HashSet<>(); + failedNodes.add(node3.getId()); + if (randomBoolean()) { + failedNodes.add("node4"); + } + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, null, null, + randomIntBetween(failedNodes.size(), 10), System.nanoTime(), System.currentTimeMillis(), false, + UnassignedInfo.AllocationStatus.NO_ATTEMPT, failedNodes); + RoutingAllocation allocation = onePrimaryOnNode1And1ReplicaRecovering(yesAllocationDeciders(), unassignedInfo); + long retainingSeqNoOnPrimary = randomLongBetween(0, Long.MAX_VALUE); + List retentionLeases = Arrays.asList( + newRetentionLease(node1, retainingSeqNoOnPrimary), newRetentionLease(node3, retainingSeqNoOnPrimary)); + testAllocator + .addData(node1, retentionLeases, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(node2, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)) + .addData(node3, randomSyncId(), new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM", MIN_SUPPORTED_LUCENE_VERSION)); + testAllocator.processExistingRecoveries(allocation); + assertThat(allocation.routingNodesChanged(), equalTo(false)); + assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED), empty()); + } + private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) { return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED); } @@ -410,8 +453,8 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide .addShard(ShardRouting.newUnassigned(shardId, false, RecoverySource.PeerRecoverySource.INSTANCE, new UnassignedInfo(reason, null, null, failedAllocations, System.nanoTime(), - System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT) - )) + System.currentTimeMillis(), delayed, UnassignedInfo.AllocationStatus.NO_ATTEMPT, + Collections.emptySet()))) .build()) ) .build(); @@ -422,7 +465,7 @@ private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders decide return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } - private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders, UnassignedInfo unassignedInfo) { ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId, node1.getId(), true, ShardRoutingState.STARTED); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder(shardId.getIndexName()).settings(settings(Version.CURRENT)) @@ -434,8 +477,7 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid .addIndexShard(new IndexShardRoutingTable.Builder(shardId) .addShard(primaryShard) .addShard(TestShardRouting.newShardRouting(shardId, node2.getId(), null, false, - ShardRoutingState.INITIALIZING, - new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null))) + ShardRoutingState.INITIALIZING, unassignedInfo)) .build()) ) .build(); @@ -446,6 +488,10 @@ private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDecid return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, System.nanoTime()); } + private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { + return onePrimaryOnNode1And1ReplicaRecovering(deciders, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)); + } + static RetentionLease newRetentionLease(DiscoveryNode node, long retainingSeqNo) { return new RetentionLease(ReplicationTracker.getPeerRecoveryRetentionLeaseId(node.getId()), retainingSeqNo, randomNonNegativeLong(), ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 680e0c052f4c1..6985e33e8afd3 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -998,7 +998,6 @@ public void testHistoryRetention() throws Exception { assertThat(recoveryState.getTranslog().recoveredOperations(), greaterThan(0)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/47974") public void testDoNotInfinitelyWaitForMapping() { internalCluster().ensureAtLeastNumDataNodes(3); createIndex("test", Settings.builder()