Skip to content

Commit

Permalink
Abort snapshots on a node that leaves the cluster (#21084)
Browse files Browse the repository at this point in the history
Previously, if a node left the cluster (for example, due to a long GC),
during a snapshot, the master node would mark the snapshot as failed, but
the node itself could continue snapshotting the data on its shards to the
repository. If the node rejoins the cluster, the master may assign it to
hold the replica shard (where it held the primary before getting kicked off
the cluster). The initialization of the replica shard would repeatedly fail
with a ShardLockObtainFailedException until the snapshot thread finally
finishes and relinquishes the lock on the Store.

This commit resolves the situation by ensuring that when a shard is removed
from a node (such as when a node rejoins the cluster and realizes it no longer
holds the active shard copy), any snapshotting of the removed shards is aborted.
In the scenario above, when the node rejoins the cluster, it will see in the cluster 
state that the node no longer holds the primary shard, so IndicesClusterStateService
will remove the shard, thereby causing any snapshots of that shard to be aborted.

Closes #20876
  • Loading branch information
Ali Beyad committed Oct 26, 2016
1 parent 457080d commit 1d278d2
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,9 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {


public static class ShardSnapshotStatus {
private State state;
private String nodeId;
private String reason;

private ShardSnapshotStatus() {
}
private final State state;
private final String nodeId;
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
Expand All @@ -231,6 +228,12 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {
this.reason = reason;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public State state() {
return state;
}
Expand All @@ -243,18 +246,6 @@ public String reason() {
return reason;
}

public static ShardSnapshotStatus readShardSnapshotStatus(StreamInput in) throws IOException {
ShardSnapshotStatus shardSnapshotStatus = new ShardSnapshotStatus();
shardSnapshotStatus.readFrom(in);
return shardSnapshotStatus;
}

public void readFrom(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
Expand Down Expand Up @@ -282,6 +273,11 @@ public int hashCode() {
result = 31 * result + (reason != null ? reason.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
}
}

public enum State {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -399,9 +400,7 @@ public Delta delta(DiscoveryNodes other) {
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{");
for (DiscoveryNode node : this) {
sb.append(node).append(',');
}
sb.append(Strings.collectionToDelimitedString(this, ","));
sb.append("}");
return sb.toString();
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,11 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
}
} finally {
try {
store.close();
if (store != null) {
store.close();
} else {
logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId);
}
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class IndexShardSnapshotStatus {
/**
* Snapshot stage
*/
public static enum Stage {
public enum Stage {
/**
* Snapshot hasn't started yet
*/
Expand Down Expand Up @@ -66,7 +66,7 @@ public static enum Stage {

private long indexVersion;

private boolean aborted;
private volatile boolean aborted;

private String failure;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotShardsService;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -113,10 +114,11 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService) {
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) {
this(settings, (AllocatedIndices<? extends Shard, ? extends AllocatedIndex<? extends Shard>>) indicesService,
clusterService, threadPool, recoveryTargetService, shardStateAction,
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService);
nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService,
snapshotShardsService);
}

// for tests
Expand All @@ -128,9 +130,10 @@ public IndicesClusterStateService(Settings settings, IndicesService indicesServi
NodeMappingRefreshAction nodeMappingRefreshAction,
RepositoriesService repositoriesService, RestoreService restoreService,
SearchService searchService, SyncedFlushService syncedFlushService,
PeerRecoverySourceService peerRecoverySourceService) {
PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) {
super(settings);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService);
this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService,
snapshotShardsService);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
Loading

0 comments on commit 1d278d2

Please sign in to comment.