diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d7910ae109b15..9fe76fbc78fa5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1400,6 +1400,13 @@ public long recoverLocallyUpToGlobalCheckpoint() { recoveryState.getTranslog().totalLocal(0); return globalCheckpoint + 1; } + if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE || + IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) { + logger.trace("skip local recovery as the index was closed or not allowed to write; safe commit {} global checkpoint {}", + safeCommit.get(), globalCheckpoint); + recoveryState.getTranslog().totalLocal(0); + return safeCommit.get().localCheckpoint + 1; + } try { final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java index 7e8f18dd005fc..f4949804e3079 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java @@ -40,7 +40,8 @@ public void testRecoverFromNoOp() throws IOException { indexShard.close("test", true); final ShardRouting shardRouting = indexShard.routingEntry(); - IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), NoOpEngine::new); + IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), + indexShard.indexSettings().getIndexMetaData(), NoOpEngine::new); recoverShardFromStore(primary); assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes()); assertEquals(nbDocs, primary.docStats().getCount()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6c59819c76705..69f0078c3f672 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4106,7 +4106,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception { final ShardRouting replicaRouting = shard.routingEntry(); ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE); - final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, + final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetaData(), engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) { @Override protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 4a0e893cdc051..9fcbce104967e 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -26,15 +26,18 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -134,23 +137,24 @@ public void testWriteFileChunksConcurrently() throws Exception { closeShards(sourceShard, targetShard); } - public void testPrepareIndexForPeerRecovery() throws Exception { - CheckedFunction populateData = shard -> { - List seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList()); - Randomness.shuffle(seqNos); - for (long seqNo : seqNos) { - shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse( - shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON)); - if (randomInt(100) < 5) { - shard.flush(new FlushRequest().waitIfOngoing(true)); - } + private SeqNoStats populateRandomData(IndexShard shard) throws IOException { + List seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + for (long seqNo : seqNos) { + shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse( + shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON)); + if (randomInt(100) < 5) { + shard.flush(new FlushRequest().waitIfOngoing(true)); } - shard.sync(); - long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint()); - shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); - shard.sync(); - return globalCheckpoint; - }; + } + shard.sync(); + long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint()); + shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + shard.sync(); + return shard.seqNoStats(); + } + + public void testPrepareIndexForPeerRecovery() throws Exception { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); @@ -166,7 +170,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { // good copy shard = newStartedShard(false); - long globalCheckpoint = populateData.apply(shard); + long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); int expectedTotalLocal = 0; @@ -191,7 +195,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { // corrupted copy shard = newStartedShard(false); if (randomBoolean()) { - populateData.apply(shard); + populateRandomData(shard); } shard.store().markStoreCorrupted(new IOException("test")); replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), @@ -206,7 +210,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { // copy with truncated translog shard = newStartedShard(false); - globalCheckpoint = populateData.apply(shard); + globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint, @@ -226,4 +230,32 @@ public void testPrepareIndexForPeerRecovery() throws Exception { assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); closeShards(replica); } + + public void testClosedIndexSkipsLocalRecovery() throws Exception { + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + IndexShard shard = newStartedShard(false); + long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); + Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); + assertTrue(safeCommit.isPresent()); + final IndexMetaData indexMetaData; + if (randomBoolean()) { + indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData()) + .settings(shard.indexSettings().getSettings()) + .state(IndexMetaData.State.CLOSE).build(); + } else { + indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData()) + .settings(Settings.builder().put(shard.indexSettings().getSettings()) + .put(IndexMetaData.SETTING_BLOCKS_WRITE, true)).build(); + } + IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE), indexMetaData, NoOpEngine::new); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + closeShards(replica); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java index 6f618739d0a4f..2125184baef63 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java @@ -86,7 +86,6 @@ protected int maximumNumberOfShards() { return 3; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/44855") public void testCloseWhileRelocatingShards() throws Exception { final String[] indices = new String[randomIntBetween(3, 5)]; final Map docsPerIndex = new HashMap<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ea966d6fd6f8f..b7b34bb461104 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -427,23 +427,24 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. * @param listeners new listerns to use for the newly created shard */ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { - return reinitShard(current, routing, current.engineFactory, listeners); + return reinitShard(current, routing, current.indexSettings.getIndexMetaData(), current.engineFactory, listeners); } /** * Takes an existing shard, closes it and starts a new initialing shard at the same location * - * @param routing the shard routing to use for the newly created shard. - * @param listeners new listerns to use for the newly created shard + * @param routing the shard routing to use for the newly created shard. + * @param listeners new listerns to use for the newly created shard + * @param indexMetaData the index metadata to use for the newly created shard * @param engineFactory the engine factory for the new shard */ - protected IndexShard reinitShard(IndexShard current, ShardRouting routing, EngineFactory engineFactory, + protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { closeShards(current); return newShard( routing, current.shardPath(), - current.indexSettings().getIndexMetaData(), + indexMetaData, null, null, engineFactory, diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java index b67258dd9b3d0..d091564c7dc54 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java @@ -3,41 +3,92 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.index.engine; -import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardTestCase; - -import java.io.IOException; - -import static org.hamcrest.Matchers.equalTo; - -public class FrozenIndexRecoveryTests extends IndexShardTestCase { - - /** - * Make sure we can recover from a frozen engine - */ - public void testRecoverFromFrozenPrimary() throws IOException { - IndexShard indexShard = newStartedShard(true); - indexDoc(indexShard, "_doc", "1"); - indexDoc(indexShard, "_doc", "2"); - indexDoc(indexShard, "_doc", "3"); - indexShard.close("test", true); - final ShardRouting shardRouting = indexShard.routingEntry(); - IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting, - shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE - ), FrozenEngine::new); - recoverShardFromStore(frozenShard); - assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo())); - assertDocCount(frozenShard, 3); - - IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new); - recoverReplica(replica, frozenShard, true); - assertDocCount(replica, 3); - closeShards(frozenShard, replica); +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.frozen.FrozenIndices; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; + +public class FrozenIndexRecoveryTests extends ESIntegTestCase { + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(FrozenIndices.class); + return plugins; + } + + public void testRecoverExistingReplica() throws Exception { + final String indexName = "test-recover-existing-replica"; + internalCluster().ensureAtLeastNumDataNodes(2); + List dataNodes = randomSubsetOf(2, Sets.newHashSet( + clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet())); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } else { + client().admin().indices().prepareSyncedFlush(indexName).get(); + } + // index more documents while one shard copy is offline + internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + Client client = client(dataNodes.get(0)); + int moreDocs = randomIntBetween(1, 50); + for (int i = 0; i < moreDocs; i++) { + client.prepareIndex(indexName, "_doc").setSource("num", i).get(); + } + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet()); + return super.onNodeStopped(nodeName); + } + }); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), not(empty())); + } + } + internalCluster().fullRestart(); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + } + } } } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java new file mode 100644 index 0000000000000..e3f49fdc2aac3 --- /dev/null +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class FrozenIndexShardTests extends IndexShardTestCase { + + /** + * Make sure we can recover from a frozen engine + */ + public void testRecoverFromFrozenPrimary() throws IOException { + IndexShard indexShard = newStartedShard(true); + indexDoc(indexShard, "_doc", "1"); + indexDoc(indexShard, "_doc", "2"); + indexDoc(indexShard, "_doc", "3"); + indexShard.close("test", true); + final ShardRouting shardRouting = indexShard.routingEntry(); + IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting, + shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE + ), indexShard.indexSettings().getIndexMetaData(), FrozenEngine::new); + recoverShardFromStore(frozenShard); + assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo())); + assertDocCount(frozenShard, 3); + + IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new); + recoverReplica(replica, frozenShard, true); + assertDocCount(replica, 3); + closeShards(frozenShard, replica); + } +}