diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 53ac626820892..f765cdcf7bf8d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -253,6 +253,8 @@ public ShardSnapshotStatus(String nodeId, State state, String reason) { this.nodeId = nodeId; this.state = state; this.reason = reason; + // If the state is failed we have to have a reason for this failure + assert state.failed() == false || reason != null; } public ShardSnapshotStatus(StreamInput in) throws IOException { @@ -413,9 +415,17 @@ public SnapshotsInProgress(StreamInput in) throws IOException { int shards = in.readVInt(); for (int j = 0; j < shards; j++) { ShardId shardId = ShardId.readShardId(in); - String nodeId = in.readOptionalString(); - State shardState = State.fromValue(in.readByte()); - builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState)); + // TODO: Change this to an appropriate version when it's backported + if (in.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + builder.put(shardId, new ShardSnapshotStatus(in)); + } else { + String nodeId = in.readOptionalString(); + State shardState = State.fromValue(in.readByte()); + // Workaround for https://github.com/elastic/elasticsearch/issues/25878 + // Some old snapshot might still have null in shard failure reasons + String reason = shardState.failed() ? "" : null; + builder.put(shardId, new ShardSnapshotStatus(nodeId, shardState, reason)); + } } long repositoryStateId = UNDEFINED_REPOSITORY_STATE_ID; if (in.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) { @@ -449,8 +459,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(entry.shards().size()); for (ObjectObjectCursor shardEntry : entry.shards()) { shardEntry.key.writeTo(out); - out.writeOptionalString(shardEntry.value.nodeId()); - out.writeByte(shardEntry.value.state().value()); + // TODO: Change this to an appropriate version when it's backported + if (out.getVersion().onOrAfter(Version.V_6_0_0_beta1)) { + shardEntry.value.writeTo(out); + } else { + out.writeOptionalString(shardEntry.value.nodeId()); + out.writeByte(shardEntry.value.state().value()); + } } if (out.getVersion().onOrAfter(REPOSITORY_ID_INTRODUCED_VERSION)) { out.writeLong(entry.repositoryStateId); diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardFailure.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardFailure.java index 7facd49088f13..b7b0561db0b7f 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardFailure.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardFailure.java @@ -62,6 +62,7 @@ public SnapshotShardFailure(@Nullable String nodeId, ShardId shardId, String rea this.nodeId = nodeId; this.shardId = shardId; this.reason = reason; + assert reason != null; status = RestStatus.INTERNAL_SERVER_ERROR; } @@ -192,7 +193,9 @@ public static SnapshotShardFailure fromXContent(XContentParser parser) throws IO } else if ("node_id".equals(currentFieldName)) { snapshotShardFailure.nodeId = parser.text(); } else if ("reason".equals(currentFieldName)) { - snapshotShardFailure.reason = parser.text(); + // Workaround for https://github.com/elastic/elasticsearch/issues/25878 + // Some old snapshot might still have null in shard failure reasons + snapshotShardFailure.reason = parser.textOrNull(); } else if ("shard_id".equals(currentFieldName)) { shardId = parser.intValue(); } else if ("status".equals(currentFieldName)) { @@ -215,6 +218,11 @@ public static SnapshotShardFailure fromXContent(XContentParser parser) throws IO throw new ElasticsearchParseException("index shard was not set"); } snapshotShardFailure.shardId = new ShardId(index, index_uuid, shardId); + // Workaround for https://github.com/elastic/elasticsearch/issues/25878 + // Some old snapshot might still have null in shard failure reasons + if (snapshotShardFailure.reason == null) { + snapshotShardFailure.reason = ""; + } return snapshotShardFailure; } diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index c8dfb7732815a..037db4d5caf66 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -1128,7 +1128,8 @@ public ClusterState execute(ClusterState currentState) throws Exception { for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { ShardSnapshotStatus status = shardEntry.value; if (!status.state().completed()) { - shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED)); + shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED, + "aborted by snapshot deletion")); } else { shardsBuilder.put(shardEntry.key, status); } diff --git a/core/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java b/core/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java index 4d1a1a6e588e0..fcf70909b31d6 100644 --- a/core/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java @@ -57,12 +57,12 @@ public void testWaitingIndices() { // test more than one waiting shard in an index shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); - shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState())); + shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); // test exactly one waiting shard in an index shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); - shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState())); + shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); // test no waiting shards in an index - shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState())); + shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT, indices, System.currentTimeMillis(), randomLong(), shards.build()); diff --git a/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 6a1b70637e49e..45110ee6a2d15 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/core/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -128,13 +128,20 @@ public SnapshotInfo waitForCompletion(String repository, String snapshotName, Ti return null; } - public static String blockMasterFromFinalizingSnapshot(final String repositoryName) { + public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) { final String masterName = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) .repository(repositoryName)).setBlockOnWriteIndexFile(true); return masterName; } + public static String blockMasterFromFinalizingSnapshotOnSnapFile(final String repositoryName) { + final String masterName = internalCluster().getMasterName(); + ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) + .repository(repositoryName)).setBlockAndFailOnWriteSnapFiles(true); + return masterName; + } + public static String blockNodeWithIndex(final String repositoryName, final String indexName) { for(String node : internalCluster().nodesInclude(indexName)) { ((MockRepository)internalCluster().getInstance(RepositoriesService.class, node).repository(repositoryName)) diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 8ca3714595283..3cf3c3a23639d 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -767,6 +767,67 @@ public void testMasterShutdownDuringSnapshot() throws Exception { assertEquals(0, snapshotInfo.failedShards()); } + + public void testMasterAndDataShutdownDuringSnapshot() throws Exception { + logger.info("--> starting three master nodes and two data nodes"); + internalCluster().startMasterOnlyNodes(3); + internalCluster().startDataOnlyNodes(2); + + final Client client = client(); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + assertAcked(prepareCreate("test-idx", 0, Settings.builder().put("number_of_shards", between(1, 20)) + .put("number_of_replicas", 0))); + ensureGreen(); + + logger.info("--> indexing some data"); + final int numdocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test-idx", "type1", Integer.toString(i)).setSource("field1", "bar " + i); + } + indexRandom(true, builders); + flushAndRefresh(); + + final int numberOfShards = getNumShards("test-idx").numPrimaries; + logger.info("number of shards: {}", numberOfShards); + + final String masterNode = blockMasterFromFinalizingSnapshotOnSnapFile("test-repo"); + final String dataNode = blockNodeWithIndex("test-repo", "test-idx"); + + dataNodeClient().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + + logger.info("--> stopping data node {}", dataNode); + stopNode(dataNode); + logger.info("--> stopping master node {} ", masterNode); + internalCluster().stopCurrentMasterNode(); + + logger.info("--> wait until the snapshot is done"); + + assertBusy(() -> { + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertTrue(snapshotInfo.state().completed()); + }, 1, TimeUnit.MINUTES); + + logger.info("--> verify that snapshot was partial"); + + GetSnapshotsResponse snapshotsStatusResponse = client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap").get(); + SnapshotInfo snapshotInfo = snapshotsStatusResponse.getSnapshots().get(0); + assertEquals(SnapshotState.PARTIAL, snapshotInfo.state()); + assertNotEquals(snapshotInfo.totalShards(), snapshotInfo.successfulShards()); + assertThat(snapshotInfo.failedShards(), greaterThan(0)); + for (SnapshotShardFailure failure : snapshotInfo.shardFailures()) { + assertNotNull(failure.reason()); + } + } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/25281") public void testMasterShutdownDuringFailedSnapshot() throws Exception { logger.info("--> starting two master nodes and two data nodes"); @@ -800,7 +861,7 @@ public void testMasterShutdownDuringFailedSnapshot() throws Exception { assertEquals(ClusterHealthStatus.RED, client().admin().cluster().prepareHealth().get().getStatus()), 30, TimeUnit.SECONDS); - final String masterNode = blockMasterFromFinalizingSnapshot("test-repo"); + final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile("test-repo"); logger.info("--> snapshot"); client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 6e778181590e5..601ca1b8210d3 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2252,9 +2252,9 @@ public void testDeleteOrphanSnapshot() throws Exception { public ClusterState execute(ClusterState currentState) { // Simulate orphan snapshot ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED)); - shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED)); - shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED)); + shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); List entries = new ArrayList<>(); entries.add(new Entry(new Snapshot(repositoryName, createSnapshotResponse.getSnapshotInfo().snapshotId()), diff --git a/core/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/core/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index edf3ada587f31..f376e205d1975 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -66,7 +66,8 @@ private Entry randomSnapshot() { ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10)); String nodeId = randomAlphaOfLength(10); State shardState = randomFrom(State.values()); - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState)); + builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState, + shardState.failed() ? randomAlphaOfLength(10) : null)); } ImmutableOpenMap shards = builder.build(); return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards); diff --git a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 74b59ae5da508..3a5b068cd8977 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/core/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -104,6 +104,9 @@ public long getFailureCount() { * finalization of a snapshot, while permitting other IO operations to proceed unblocked. */ private volatile boolean blockOnWriteIndexFile; + /** Allows blocking on writing the snapshot file at the end of snapshot creation to simulate a died master node */ + private volatile boolean blockAndFailOnWriteSnapFile; + private volatile boolean atomicMove; private volatile boolean blocked = false; @@ -118,6 +121,7 @@ public MockRepository(RepositoryMetaData metadata, Environment environment, blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false); + blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); atomicMove = metadata.settings().getAsBoolean("atomic_move", true); @@ -168,6 +172,7 @@ public synchronized void unblock() { blockOnControlFiles = false; blockOnInitialization = false; blockOnWriteIndexFile = false; + blockAndFailOnWriteSnapFile = false; this.notifyAll(); } @@ -175,6 +180,10 @@ public void blockOnDataFiles(boolean blocked) { blockOnDataFiles = blocked; } + public void setBlockAndFailOnWriteSnapFiles(boolean blocked) { + blockAndFailOnWriteSnapFile = blocked; + } + public void setBlockOnWriteIndexFile(boolean blocked) { blockOnWriteIndexFile = blocked; } @@ -187,7 +196,8 @@ private synchronized boolean blockExecution() { logger.debug("Blocking execution"); boolean wasBlocked = false; try { - while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile) { + while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile || + blockAndFailOnWriteSnapFile) { blocked = true; this.wait(); wasBlocked = true; @@ -266,6 +276,8 @@ private void maybeIOExceptionOrBlock(String blobName) throws IOException { throw new IOException("Random IOException"); } else if (blockOnControlFiles) { blockExecutionAndMaybeWait(blobName); + } else if (blobName.startsWith("snap-") && blockAndFailOnWriteSnapFile) { + blockExecutionAndFail(blobName); } } } @@ -283,6 +295,15 @@ private void blockExecutionAndMaybeWait(final String blobName) { } } + /** + * Blocks an I/O operation on the blob fails and throws an exception when unblocked + */ + private void blockExecutionAndFail(final String blobName) throws IOException { + logger.info("blocking I/O operation for file [{}] at path [{}]", blobName, path()); + blockExecution(); + throw new IOException("exception after block"); + } + MockBlobContainer(BlobContainer delegate) { super(delegate); }