From 907bb55427516a2190ff829314602aeb472e49fe Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 30 Jul 2019 10:45:20 -0400 Subject: [PATCH] Skip local recovery for closed or frozen indices (#44887) For closed and frozen indices, we should not recover shard locally up to the global checkpoint before performing peer recovery for that copy might be offline when the index was closed/frozen. Relates #43463 Closes #44855 --- .../elasticsearch/index/shard/IndexShard.java | 7 ++ .../index/engine/NoOpEngineRecoveryTests.java | 3 +- .../index/shard/IndexShardTests.java | 2 +- .../PeerRecoveryTargetServiceTests.java | 72 ++++++++--- .../state/CloseWhileRelocatingShardsIT.java | 1 - .../index/shard/IndexShardTestCase.java | 11 +- .../engine/FrozenIndexRecoveryTests.java | 117 +++++++++++++----- .../index/engine/FrozenIndexShardTests.java | 43 +++++++ 8 files changed, 195 insertions(+), 61 deletions(-) create mode 100644 x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index d7910ae109b15..9fe76fbc78fa5 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1400,6 +1400,13 @@ public long recoverLocallyUpToGlobalCheckpoint() { recoveryState.getTranslog().totalLocal(0); return globalCheckpoint + 1; } + if (indexSettings.getIndexMetaData().getState() == IndexMetaData.State.CLOSE || + IndexMetaData.INDEX_BLOCKS_WRITE_SETTING.get(indexSettings.getSettings())) { + logger.trace("skip local recovery as the index was closed or not allowed to write; safe commit {} global checkpoint {}", + safeCommit.get(), globalCheckpoint); + recoveryState.getTranslog().totalLocal(0); + return safeCommit.get().localCheckpoint + 1; + } try { final Engine.TranslogRecoveryRunner translogRecoveryRunner = (engine, snapshot) -> { recoveryState.getTranslog().totalLocal(snapshot.totalOperations()); diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java index 7e8f18dd005fc..f4949804e3079 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineRecoveryTests.java @@ -40,7 +40,8 @@ public void testRecoverFromNoOp() throws IOException { indexShard.close("test", true); final ShardRouting shardRouting = indexShard.routingEntry(); - IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), NoOpEngine::new); + IndexShard primary = reinitShard(indexShard, initWithSameId(shardRouting, ExistingStoreRecoverySource.INSTANCE), + indexShard.indexSettings().getIndexMetaData(), NoOpEngine::new); recoverShardFromStore(primary); assertEquals(primary.seqNoStats().getMaxSeqNo(), primary.getMaxSeqNoOfUpdatesOrDeletes()); assertEquals(nbDocs, primary.docStats().getCount()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6c59819c76705..69f0078c3f672 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -4106,7 +4106,7 @@ public void testDoNotTrimCommitsWhenOpenReadOnlyEngine() throws Exception { final ShardRouting replicaRouting = shard.routingEntry(); ShardRouting readonlyShardRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, RecoverySource.ExistingStoreRecoverySource.INSTANCE); - final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, + final IndexShard readonlyShard = reinitShard(shard, readonlyShardRouting, shard.indexSettings.getIndexMetaData(), engineConfig -> new ReadOnlyEngine(engineConfig, null, null, true, Function.identity()) { @Override protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(SeqNoStats seqNoStats) { diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 4a0e893cdc051..9fcbce104967e 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -26,15 +26,18 @@ import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.NoOpEngine; import org.elasticsearch.index.mapper.SourceToParse; +import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; @@ -134,23 +137,24 @@ public void testWriteFileChunksConcurrently() throws Exception { closeShards(sourceShard, targetShard); } - public void testPrepareIndexForPeerRecovery() throws Exception { - CheckedFunction populateData = shard -> { - List seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList()); - Randomness.shuffle(seqNos); - for (long seqNo : seqNos) { - shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse( - shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON)); - if (randomInt(100) < 5) { - shard.flush(new FlushRequest().waitIfOngoing(true)); - } + private SeqNoStats populateRandomData(IndexShard shard) throws IOException { + List seqNos = LongStream.range(0, 100).boxed().collect(Collectors.toList()); + Randomness.shuffle(seqNos); + for (long seqNo : seqNos) { + shard.applyIndexOperationOnReplica(seqNo, 1, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false, new SourceToParse( + shard.shardId().getIndexName(), "_doc", UUIDs.randomBase64UUID(), new BytesArray("{}"), XContentType.JSON)); + if (randomInt(100) < 5) { + shard.flush(new FlushRequest().waitIfOngoing(true)); } - shard.sync(); - long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint()); - shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); - shard.sync(); - return globalCheckpoint; - }; + } + shard.sync(); + long globalCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, shard.getLocalCheckpoint()); + shard.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + shard.sync(); + return shard.seqNoStats(); + } + + public void testPrepareIndexForPeerRecovery() throws Exception { DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); @@ -166,7 +170,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { // good copy shard = newStartedShard(false); - long globalCheckpoint = populateData.apply(shard); + long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); assertTrue(safeCommit.isPresent()); int expectedTotalLocal = 0; @@ -191,7 +195,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { // corrupted copy shard = newStartedShard(false); if (randomBoolean()) { - populateData.apply(shard); + populateRandomData(shard); } shard.store().markStoreCorrupted(new IOException("test")); replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), @@ -206,7 +210,7 @@ public void testPrepareIndexForPeerRecovery() throws Exception { // copy with truncated translog shard = newStartedShard(false); - globalCheckpoint = populateData.apply(shard); + globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), RecoverySource.PeerRecoverySource.INSTANCE)); String translogUUID = Translog.createEmptyTranslog(replica.shardPath().resolveTranslog(), globalCheckpoint, @@ -226,4 +230,32 @@ public void testPrepareIndexForPeerRecovery() throws Exception { assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); closeShards(replica); } + + public void testClosedIndexSkipsLocalRecovery() throws Exception { + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), + Collections.emptyMap(), Collections.emptySet(), Version.CURRENT); + IndexShard shard = newStartedShard(false); + long globalCheckpoint = populateRandomData(shard).getGlobalCheckpoint(); + Optional safeCommit = shard.store().findSafeIndexCommit(globalCheckpoint); + assertTrue(safeCommit.isPresent()); + final IndexMetaData indexMetaData; + if (randomBoolean()) { + indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData()) + .settings(shard.indexSettings().getSettings()) + .state(IndexMetaData.State.CLOSE).build(); + } else { + indexMetaData = IndexMetaData.builder(shard.indexSettings().getIndexMetaData()) + .settings(Settings.builder().put(shard.indexSettings().getSettings()) + .put(IndexMetaData.SETTING_BLOCKS_WRITE, true)).build(); + } + IndexShard replica = reinitShard(shard, ShardRoutingHelper.initWithSameId(shard.routingEntry(), + RecoverySource.PeerRecoverySource.INSTANCE), indexMetaData, NoOpEngine::new); + replica.markAsRecovering("for testing", new RecoveryState(replica.routingEntry(), localNode, localNode)); + replica.prepareForIndexRecovery(); + assertThat(replica.recoverLocallyUpToGlobalCheckpoint(), equalTo(safeCommit.get().localCheckpoint + 1)); + assertThat(replica.recoveryState().getTranslog().totalLocal(), equalTo(0)); + assertThat(replica.recoveryState().getTranslog().recoveredOperations(), equalTo(0)); + assertThat(replica.getLastKnownGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + closeShards(replica); + } } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java index 6f618739d0a4f..2125184baef63 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java @@ -86,7 +86,6 @@ protected int maximumNumberOfShards() { return 3; } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/44855") public void testCloseWhileRelocatingShards() throws Exception { final String[] indices = new String[randomIntBetween(3, 5)]; final Map docsPerIndex = new HashMap<>(); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index ea966d6fd6f8f..b7b34bb461104 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -427,23 +427,24 @@ protected IndexShard reinitShard(IndexShard current, IndexingOperationListener.. * @param listeners new listerns to use for the newly created shard */ protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexingOperationListener... listeners) throws IOException { - return reinitShard(current, routing, current.engineFactory, listeners); + return reinitShard(current, routing, current.indexSettings.getIndexMetaData(), current.engineFactory, listeners); } /** * Takes an existing shard, closes it and starts a new initialing shard at the same location * - * @param routing the shard routing to use for the newly created shard. - * @param listeners new listerns to use for the newly created shard + * @param routing the shard routing to use for the newly created shard. + * @param listeners new listerns to use for the newly created shard + * @param indexMetaData the index metadata to use for the newly created shard * @param engineFactory the engine factory for the new shard */ - protected IndexShard reinitShard(IndexShard current, ShardRouting routing, EngineFactory engineFactory, + protected IndexShard reinitShard(IndexShard current, ShardRouting routing, IndexMetaData indexMetaData, EngineFactory engineFactory, IndexingOperationListener... listeners) throws IOException { closeShards(current); return newShard( routing, current.shardPath(), - current.indexSettings().getIndexMetaData(), + indexMetaData, null, null, engineFactory, diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java index b67258dd9b3d0..d091564c7dc54 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexRecoveryTests.java @@ -3,41 +3,92 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ + package org.elasticsearch.index.engine; -import org.elasticsearch.cluster.routing.RecoverySource; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.shard.IndexShard; -import org.elasticsearch.index.shard.IndexShardTestCase; - -import java.io.IOException; - -import static org.hamcrest.Matchers.equalTo; - -public class FrozenIndexRecoveryTests extends IndexShardTestCase { - - /** - * Make sure we can recover from a frozen engine - */ - public void testRecoverFromFrozenPrimary() throws IOException { - IndexShard indexShard = newStartedShard(true); - indexDoc(indexShard, "_doc", "1"); - indexDoc(indexShard, "_doc", "2"); - indexDoc(indexShard, "_doc", "3"); - indexShard.close("test", true); - final ShardRouting shardRouting = indexShard.routingEntry(); - IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting, - shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE - ), FrozenEngine::new); - recoverShardFromStore(frozenShard); - assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo())); - assertDocCount(frozenShard, 3); - - IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new); - recoverReplica(replica, frozenShard, true); - assertDocCount(replica, 3); - closeShards(frozenShard, replica); +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.frozen.FreezeRequest; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.xpack.core.frozen.action.FreezeIndexAction; +import org.elasticsearch.xpack.frozen.FrozenIndices; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.not; + +public class FrozenIndexRecoveryTests extends ESIntegTestCase { + + @Override + protected boolean addMockInternalEngine() { + return false; + } + + @Override + protected Collection> nodePlugins() { + List> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(FrozenIndices.class); + return plugins; + } + + public void testRecoverExistingReplica() throws Exception { + final String indexName = "test-recover-existing-replica"; + internalCluster().ensureAtLeastNumDataNodes(2); + List dataNodes = randomSubsetOf(2, Sets.newHashSet( + clusterService().state().nodes().getDataNodes().valuesIt()).stream().map(DiscoveryNode::getName).collect(Collectors.toSet())); + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)) + .build()); + indexRandom(randomBoolean(), randomBoolean(), randomBoolean(), IntStream.range(0, randomIntBetween(0, 50)) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("num", n)).collect(toList())); + ensureGreen(indexName); + if (randomBoolean()) { + client().admin().indices().prepareFlush(indexName).get(); + } else { + client().admin().indices().prepareSyncedFlush(indexName).get(); + } + // index more documents while one shard copy is offline + internalCluster().restartNode(dataNodes.get(1), new InternalTestCluster.RestartCallback() { + @Override + public Settings onNodeStopped(String nodeName) throws Exception { + Client client = client(dataNodes.get(0)); + int moreDocs = randomIntBetween(1, 50); + for (int i = 0; i < moreDocs; i++) { + client.prepareIndex(indexName, "_doc").setSource("num", i).get(); + } + assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest(indexName)).actionGet()); + return super.onNodeStopped(nodeName); + } + }); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), not(empty())); + } + } + internalCluster().fullRestart(); + ensureGreen(indexName); + internalCluster().assertSameDocIdsOnShards(); + for (RecoveryState recovery : client().admin().indices().prepareRecoveries(indexName).get().shardRecoveryStates().get(indexName)) { + if (recovery.getPrimary() == false) { + assertThat(recovery.getIndex().fileDetails(), empty()); + } + } } } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java new file mode 100644 index 0000000000000..e3f49fdc2aac3 --- /dev/null +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexShardTests.java @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.index.engine; + +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardTestCase; + +import java.io.IOException; + +import static org.hamcrest.Matchers.equalTo; + +public class FrozenIndexShardTests extends IndexShardTestCase { + + /** + * Make sure we can recover from a frozen engine + */ + public void testRecoverFromFrozenPrimary() throws IOException { + IndexShard indexShard = newStartedShard(true); + indexDoc(indexShard, "_doc", "1"); + indexDoc(indexShard, "_doc", "2"); + indexDoc(indexShard, "_doc", "3"); + indexShard.close("test", true); + final ShardRouting shardRouting = indexShard.routingEntry(); + IndexShard frozenShard = reinitShard(indexShard, ShardRoutingHelper.initWithSameId(shardRouting, + shardRouting.primary() ? RecoverySource.ExistingStoreRecoverySource.INSTANCE : RecoverySource.PeerRecoverySource.INSTANCE + ), indexShard.indexSettings().getIndexMetaData(), FrozenEngine::new); + recoverShardFromStore(frozenShard); + assertThat(frozenShard.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(frozenShard.seqNoStats().getMaxSeqNo())); + assertDocCount(frozenShard, 3); + + IndexShard replica = newShard(false, Settings.EMPTY, FrozenEngine::new); + recoverReplica(replica, frozenShard, true); + assertDocCount(replica, 3); + closeShards(frozenShard, replica); + } +}