Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot/Restore: Ensure that shard failure reasons are correctly stored in CS #25941

Merged
merged 2 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
// If the state is failed we have to have a reason for this failure
assert state.failed() == false || reason != null;
}

public ShardSnapshotStatus(StreamInput in) throws IOException {
Expand Down Expand Up @@ -413,9 +415,15 @@ public SnapshotsInProgress(StreamInput in) throws IOException {
int shards = in.readVInt();
for (int j = 0; j < shards; j++) {
ShardId shardId = ShardId.readShardId(in);
String nodeId = in.readOptionalString();
State shardState = State.fromValue(in.readByte());
builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState));
// TODO: Change this to an appropriate version when it's backported
if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
builder.put(shardId, new ShardSnapshotStatus(in));
} else {
String nodeId = in.readOptionalString();
State shardState = State.fromValue(in.readByte());
String reason = shardState.failed() ? "" : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment here saying why we set reason to ""?

builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState, reason));
}
}
long repositoryStateId = UNDEFINED_REPOSITORY_STATE_ID;
if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
Expand Down Expand Up @@ -449,8 +457,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(entry.shards().size());
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : entry.shards()) {
shardEntry.key.writeTo(out);
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value());
// TODO: Change this to an appropriate version when it's backported
if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) {
shardEntry.value.writeTo(out);
} else {
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value());
}
}
if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) {
out.writeLong(entry.repositoryStateId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public SnapshotShardFailure(@Nullable String nodeId, ShardId shardId, String rea
this.nodeId = nodeId;
this.shardId = shardId;
this.reason = reason;
assert reason != null;
status = RestStatus.INTERNAL_SERVER_ERROR;
}

Expand Down Expand Up @@ -215,6 +216,11 @@ public static SnapshotShardFailure fromXContent(XContentParser parser) throws IO
throw new ElasticsearchParseException("index shard was not set");
}
snapshotShardFailure.shardId = new ShardId(index, index_uuid, shardId);
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
// Some old snapshot might still have null in shard failure reasons
if (snapshotShardFailure.reason == null) {
snapshotShardFailure.reason = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I don't quite understand: Why will it happily parse the reason field if it is null? Currently we parse it using text(), shouldn't that fail and should we use textOrNull() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it should be textOrNull().

}
return snapshotShardFailure;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (!status.state().completed()) {
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED));
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe extend the message to "aborted by snapshot deletion"

} else {
shardsBuilder.put(shardEntry.key, status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public void testWaitingIndices() {
// test more than one waiting shard in an index
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
// test exactly one waiting shard in an index
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
// test no waiting shards in an index
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState()));
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
indices, System.currentTimeMillis(), randomLong(), shards.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,13 @@ public static String blockMasterFromFinalizingSnapshot(final String repositoryNa
return masterName;
}

public static String blockMasterFromCreatingSnapshot(final String repositoryName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method name made me think that it would prevent the master from creating a snapshot at all. Maybe we can call it something along the lines of "blockMasterFromFinalizingSnapshot"?

final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockAndFailOnWriteSnapFiles(true);
return masterName;
}

public static String blockNodeWithIndex(final String repositoryName, final String indexName) {
for(String node : internalCluster().nodesInclude(indexName)) {
((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,67 @@ public void testMasterShutdownDuringSnapshot() throws Exception {
assertEquals(0, snapshotInfo.failedShards());
}


public void testMasterAndDataShutdownDuringSnapshot() throws Exception {
logger.info("--> starting three master nodes and two data nodes");
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNodes(2);

final Client client = client();

logger.info("--> creating repository");
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));

assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20))
.put("number_of_replicas", 0)));
ensureGreen();

logger.info("--> indexing some data");
final int numdocs = randomIntBetween(10, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test-idx", "type1", Integer.toString(i)).setSource("field1", "bar " + i);
}
indexRandom(true, builders);
flushAndRefresh();

final int numberOfShards = getNumShards("test-idx").numPrimaries;
logger.info("number of shards: {}", numberOfShards);

final String masterNode = blockMasterFromCreatingSnapshot("test-repo");
final String dataNode = blockNodeWithIndex("test-repo", "test-idx");

dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();

logger.info("--> stopping data node {}", dataNode);
stopNode(dataNode);
logger.info("--> stopping master node {} ", masterNode);
internalCluster().stopCurrentMasterNode();

logger.info("--> wait until the snapshot is done");

assertBusy(() -> {
GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertTrue(snapshotInfo.state().completed());
}, 1, TimeUnit.MINUTES);

logger.info("--> verify that snapshot was partial");

GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get();
SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0);
assertEquals(SnapshotState.PARTIAL, snapshotInfo.state());
assertNotEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards());
assertThat(snapshotInfo.failedShards(), greaterThan(0));
for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) {
assertNotNull(failure.reason());
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25281")
public void testMasterShutdownDuringFailedSnapshot() throws Exception {
logger.info("--> starting two master nodes and two data nodes");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2252,9 +2252,9 @@ public void testDeleteOrphanSnapshot() throws Exception {
public ClusterState execute(ClusterState currentState) {
// Simulate orphan snapshot
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED));
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
List<Entry> entries = new ArrayList<>();
entries.add(new Entry(new Snapshot(repositoryName,
createSnapshotResponse.getSnapshotInfo().snapshotId()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ private Entry randomSnapshot() {
ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10));
String nodeId = randomAlphaOfLength(10);
State shardState = randomFrom(State.values());
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
shardState.failed() ? randomAlphaOfLength(10) : null));
}
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public long getFailureCount() {
* finalization of a snapshot, while permitting other IO operations to proceed unblocked. */
private volatile boolean blockOnWriteIndexFile;

/** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */
private volatile boolean blockAndFailOnWriteSnapFile;

private volatile boolean atomicMove;

private volatile boolean blocked = false;
Expand All @@ -118,6 +121,7 @@ public MockRepository(RepositoryMetaData metadata, Environment environment,
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
randomPrefix = metadata.settings().get("random", "default");
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
atomicMove = metadata.settings().getAsBoolean("atomic_move", true);
Expand Down Expand Up @@ -168,13 +172,18 @@ public synchronized void unblock() {
blockOnControlFiles = false;
blockOnInitialization = false;
blockOnWriteIndexFile = false;
blockAndFailOnWriteSnapFile = false;
this.notifyAll();
}

public void blockOnDataFiles(boolean blocked) {
blockOnDataFiles = blocked;
}

public void setBlockAndFailOnWriteSnapFiles(boolean blocked) {
blockAndFailOnWriteSnapFile = blocked;
}

public void setBlockOnWriteIndexFile(boolean blocked) {
blockOnWriteIndexFile = blocked;
}
Expand All @@ -187,7 +196,8 @@ private synchronized boolean blockExecution() {
logger.debug("Blocking execution");
boolean wasBlocked = false;
try {
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile) {
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile) {
blocked = true;
this.wait();
wasBlocked = true;
Expand Down Expand Up @@ -266,6 +276,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException {
throw new IOException("Random IOException");
} else if (blockOnControlFiles) {
blockExecutionAndMaybeWait(blobName);
} else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) {
blockExecutionAndFail(blobName);
}
}
}
Expand All @@ -283,6 +295,15 @@ private void blockExecutionAndMaybeWait(final String blobName) {
}
}

/**
* Blocks an I/O operation on the blob fails and throws an exception when unblocked
*/
private void blockExecutionAndFail(final String blobName) throws IOException {
logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path());
blockExecution();
throw new IOException("exception after block");
}

MockBlobContainer(BlobContainer delegate) {
super(delegate);
}
Expand Down