Skip to content

Commit

Permalink
Abort snapshots on a node that leaves the cluster
Browse files Browse the repository at this point in the history
Previously, if a node left the cluster (for example, due to a long GC),
during a snapshot, the master node would mark the snapshot as failed, but
the node itself could continue snapshotting the data on its shards to the
repository.  If the node rejoins the cluster, the master may assign it to
hold the replica shard (where it held the primary before getting kicked off
the cluster).  The initialization of the replica shard would repeatedly fail
with a ShardLockObtainFailedException until the snapshot thread finally
finishes and relinquishes the lock on the Store.

This commit resolves the situation by ensuring that the shard snapshot is
aborted when the node responsible for that shard's snapshot leaves the cluster.
When the node rejoins the cluster, it will see in the cluster state that
the snapshot for that shard is failed and abort the snapshot locally,
allowing the shard data directory to be freed for allocation of a replica
shard on the same node.

Closes elastic#20876
  • Loading branch information
Ali Beyad committed Oct 23, 2016
1 parent 3d2e885 commit 9e895b4
Show file tree
Hide file tree
Showing 8 changed files with 261 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,9 @@ public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {


public static class ShardSnapshotStatus {
private State state;
private String nodeId;
private String reason;

private ShardSnapshotStatus() {
}
private final State state;
private final String nodeId;
private final String reason;

public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
Expand All @@ -231,6 +228,12 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {
this.reason = reason;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public State state() {
return state;
}
Expand All @@ -243,18 +246,6 @@ public String reason() {
return reason;
}

public static ShardSnapshotStatus readShardSnapshotStatus(StreamInput in) throws IOException {
ShardSnapshotStatus shardSnapshotStatus = new ShardSnapshotStatus();
shardSnapshotStatus.readFrom(in);
return shardSnapshotStatus;
}

public void readFrom(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
reason = in.readOptionalString();
}

public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
Expand Down Expand Up @@ -282,6 +273,11 @@ public int hashCode() {
result = 31 * result + (reason != null ? reason.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]";
}
}

public enum State {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,10 @@ public String toString() {
for (DiscoveryNode node : this) {
sb.append(node).append(',');
}
if (sb.length() > 1) {
// trim off last comma
sb.setLength(sb.length() - 1);
}
sb.append("}");
return sb.toString();
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,11 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
}
} finally {
try {
store.close();
if (store != null) {
store.close();
} else {
logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId);
}
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class IndexShardSnapshotStatus {
/**
* Snapshot stage
*/
public static enum Stage {
public enum Stage {
/**
* Snapshot hasn't started yet
*/
Expand Down Expand Up @@ -66,7 +66,7 @@ public static enum Stage {

private long indexVersion;

private boolean aborted;
private volatile boolean aborted;

private String failure;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
Expand All @@ -47,6 +48,7 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus.Stage;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
Expand Down Expand Up @@ -156,12 +158,8 @@ public void clusterChanged(ClusterChangedEvent event) {
SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE);

if (prev == null) {
if (curr != null) {
processIndexShardSnapshots(event);
}
} else if (prev.equals(curr) == false) {
processIndexShardSnapshots(event);
if ((prev == null && curr != null) || (prev != null && prev.equals(curr) == false)) {
processIndexShardSnapshots(event);
}
String masterNodeId = event.state().nodes().getMasterNodeId();
if (masterNodeId != null && masterNodeId.equals(event.previousState().nodes().getMasterNodeId()) == false) {
Expand Down Expand Up @@ -205,6 +203,16 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
final Snapshot snapshot = entry.getKey();
if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) {
survivors.put(entry.getKey(), entry.getValue());
} else {
// abort any running snapshots of shards for the removed entry;
// this could happen if for some reason the cluster state update for aborting
// running shards is missed, then the snapshot is removed is a subsequent cluster
// state update, which is being processed here
for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) {
if (snapshotStatus.stage() == Stage.INIT || snapshotStatus.stage() == Stage.STARTED) {
snapshotStatus.abort();
}
}
}
}

Expand All @@ -221,12 +229,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
if (entry.state() == SnapshotsInProgress.State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = new HashMap<>();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
// Add all new shards to start processing on
if (localNodeId.equals(shard.value.nodeId())) {
if (shard.value.state() == SnapshotsInProgress.State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) {
logger.trace("[{}] - Adding shard to the queue", shard.key);
startedShards.put(shard.key, new IndexShardSnapshotStatus());
} else if (shard.value.state().failed() && snapshotShards != null) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key);
if (snapshotStatus != null) {
logger.trace("[{}] shard snapshot failed, abort snapshotting on the local node", shard.key);
snapshotStatus.abort();
}
}
}
}
Expand All @@ -249,7 +263,7 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
// Abort all running shards for this snapshot
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
if (snapshotShards != null) {
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shard : entry.shards()) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key);
if (snapshotStatus != null) {
switch (snapshotStatus.stage()) {
Expand All @@ -263,19 +277,33 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
case DONE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshot(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
break;
case FAILURE:
logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key);
updateIndexShardSnapshotStatus(entry.snapshot(), shard.key,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure()));
break;
default:
throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage());
}
}
}
}
} else if (entry.state() == SnapshotsInProgress.State.SUCCESS) {
// for any failed snapshots, make sure the local snapshotting is aborted
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot());
if (snapshotShards != null) {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shard : entry.shards()) {
if (localNodeId.equals(shard.value.nodeId()) && shard.value.state().failed()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key);
if (snapshotStatus != null) {
logger.trace("[{}] shard snapshot failed, abort snapshotting on the local node", shard.key);
snapshotStatus.abort();
}
}
}
}
}
}
}
Expand Down Expand Up @@ -309,18 +337,18 @@ private void processIndexShardSnapshots(ClusterChangedEvent event) {
@Override
public void doRun() {
snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue());
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS));
}

@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e);
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
}

});
} catch (Exception e) {
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e)));
}
}
}
Expand Down Expand Up @@ -383,23 +411,23 @@ private void syncShardStatsOnNewMaster(ClusterChangedEvent event) {
if (snapshot.state() == SnapshotsInProgress.State.STARTED || snapshot.state() == SnapshotsInProgress.State.ABORTED) {
Map<ShardId, IndexShardSnapshotStatus> localShards = currentSnapshotShards(snapshot.snapshot());
if (localShards != null) {
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> masterShards = snapshot.shards();
ImmutableOpenMap<ShardId, ShardSnapshotStatus> masterShards = snapshot.shards();
for(Map.Entry<ShardId, IndexShardSnapshotStatus> localShard : localShards.entrySet()) {
ShardId shardId = localShard.getKey();
IndexShardSnapshotStatus localShardStatus = localShard.getValue();
SnapshotsInProgress.ShardSnapshotStatus masterShard = masterShards.get(shardId);
ShardSnapshotStatus masterShard = masterShards.get(shardId);
if (masterShard != null && masterShard.state().completed() == false) {
// Master knows about the shard and thinks it has not completed
if (localShardStatus.stage() == IndexShardSnapshotStatus.Stage.DONE) {
if (localShardStatus.stage() == Stage.DONE) {
// but we think the shard is done - we need to make new master know that the shard is done
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
} else if (localShard.getValue().stage() == IndexShardSnapshotStatus.Stage.FAILURE) {
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS));
} else if (localShard.getValue().stage() == Stage.FAILURE) {
// but we think the shard failed - we need to make new master know that the shard failed
logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId);
updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId,
new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure()));
new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure()));

}
}
Expand Down Expand Up @@ -427,15 +455,15 @@ private SnapshotShards(Map<ShardId, IndexShardSnapshotStatus> shards) {
public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest {
private Snapshot snapshot;
private ShardId shardId;
private SnapshotsInProgress.ShardSnapshotStatus status;
private ShardSnapshotStatus status;

private volatile boolean processed; // state field, no need to serialize

public UpdateIndexShardSnapshotStatusRequest() {

}

public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
Expand All @@ -446,7 +474,7 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
snapshot = new Snapshot(in);
shardId = ShardId.readShardId(in);
status = SnapshotsInProgress.ShardSnapshotStatus.readShardSnapshotStatus(in);
status = new ShardSnapshotStatus(in);
}

@Override
Expand All @@ -465,7 +493,7 @@ public ShardId shardId() {
return shardId;
}

public SnapshotsInProgress.ShardSnapshotStatus status() {
public ShardSnapshotStatus status() {
return status;
}

Expand All @@ -486,7 +514,7 @@ public boolean isProcessed() {
/**
* Updates the shard status
*/
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status);
try {
if (clusterService.state().nodes().isLocalNodeElectedMaster()) {
Expand Down Expand Up @@ -533,7 +561,7 @@ public ClusterState execute(ClusterState currentState) {
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;

for (int i = 0; i < batchSize; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.SnapshotsInProgress.PROTO;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;

/**
Expand Down Expand Up @@ -793,12 +794,12 @@ private boolean waitingShardsStartedOrUnassigned(ClusterChangedEvent event) {
}

private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) {
// Check if we just became the master
boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster();
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
return false;
}
// Check if we just became the master
boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster();
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) {
// We just replaced old master and snapshots in intermediate states needs to be cleaned
Expand Down
Loading

0 comments on commit 9e895b4

Please sign in to comment.