From 71902d8711d41660d101720fe478230b2b4006eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Francisco=20Fern=C3=A1ndez=20Casta=C3=B1o?= Date: Thu, 2 Sep 2021 13:56:53 +0200 Subject: [PATCH] Fix FollowingEngineTests#testOptimizeMultipleVersions (#77170) In certain concurrent indexing scenarios where there are deletes executed and then a new indexing operation, the following engine considers those as updates breaking one of the assumed invariants. Closes #72527 Backport of #75583 --- .../index/engine/InternalEngine.java | 10 +- .../ccr/index/engine/FollowingEngine.java | 19 ++- .../index/engine/FollowingEngineTests.java | 108 +++++++++++++++++- 3 files changed, 130 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a9c32e1b2f310..8939351af44f5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -880,7 +880,11 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { return doGenerateSeqNoForOperation(operation); } - protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) { + advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); + } + + protected void advanceMaxSeqNoOfDeletesOnPrimary(long seqNo) { advanceMaxSeqNoOfUpdatesOrDeletes(seqNo); } @@ -948,7 +952,7 @@ public IndexResult index(Index index) throws IOException { final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false; if (toAppend == false) { - advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo()); + advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo()); } } else { markSeqNoAsSeen(index.seqNo()); @@ -1332,7 +1336,7 @@ public DeleteResult delete(Delete delete) throws IOException { delete.primaryTerm(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm()); - advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(delete.seqNo()); + advanceMaxSeqNoOfDeletesOnPrimary(delete.seqNo()); } else { markSeqNoAsSeen(delete.seqNo()); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 45927cca81817..772d9c6c86493 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -35,7 +35,7 @@ /** * An engine implementation for following shards. */ -public final class FollowingEngine extends InternalEngine { +public class FollowingEngine extends InternalEngine { /** @@ -118,14 +118,27 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) { } @Override - protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) { + protected void advanceMaxSeqNoOfDeletesOnPrimary(long seqNo) { if (Assertions.ENABLED) { final long localCheckpoint = getProcessedLocalCheckpoint(); final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes(); assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo : "maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo; } - super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code + + super.advanceMaxSeqNoOfDeletesOnPrimary(seqNo); + } + + @Override + protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) { + // In some scenarios it is possible to advance maxSeqNoOfUpdatesOrDeletes over the leader + // maxSeqNoOfUpdatesOrDeletes, since in this engine (effectively it is a replica) we don't check if the previous version + // was a delete and it's possible to consider it as an update, advancing the max sequence number over the leader + // maxSeqNoOfUpdatesOrDeletes. + // We conservatively advance the seqno in this case, accepting a minor performance hit in this edge case. + + // See FollowingEngineTests#testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates or #72527 for more details. + super.advanceMaxSeqNoOfUpdatesOnPrimary(seqNo); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 9b381da90cd2c..2fafd4cd53ec0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -18,9 +18,9 @@ import org.elasticsearch.common.CheckedBiFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.core.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; @@ -32,8 +32,10 @@ import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.TranslogHandler; +import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.RetentionLeases; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -56,6 +58,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -294,12 +297,34 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin) { final long version = randomBoolean() ? 1 : randomNonNegativeLong(); + return indexForFollowing(id, seqNo, origin, version); + } + + private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin, long version) { final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null); return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version, VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0); } + private Engine.Delete deleteForFollowing(String id, long seqNo, Engine.Operation.Origin origin, long version) { + final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null); + long startTime = System.nanoTime(); + final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); + return new Engine.Delete(parsedDoc.type(), + parsedDoc.id(), + uid, + seqNo, + primaryTerm.get(), + version, + VersionType.EXTERNAL, + origin, + startTime, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ); + } + private Engine.Index indexForPrimary(String id) { final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null); return new Engine.Index(EngineTestCase.newUid(parsedDoc), primaryTerm.get(), parsedDoc); @@ -453,6 +478,87 @@ public void testOptimizeSingleDocConcurrently() throws Exception { }); } + public void testConcurrentIndexOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates() throws Exception { + // See #72527 for more details + Settings followerSettings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + .put("index.version.created", Version.CURRENT) + .put("index.xpack.ccr.following_index", true) + .build(); + + IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build(); + IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, Settings.EMPTY); + try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) { + EngineConfig followerConfig = + engineConfig(shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry()); + followerStore.createEmpty(); + String translogUuid = Translog.createEmptyTranslog(followerConfig.getTranslogConfig().getTranslogPath(), + SequenceNumbers.NO_OPS_PERFORMED, + shardId, + 1L + ); + followerStore.associateIndexWithNewTranslog(translogUuid); + CountDownLatch concurrentDeleteOpLatch = new CountDownLatch(1); + final long indexNewDocWithSameIdSeqNo = 4; + FollowingEngine followingEngine = new FollowingEngine(followerConfig) { + @Override + protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) { + if (seqNo == indexNewDocWithSameIdSeqNo) { + // wait until the concurrent delete finishes meaning that processedLocalCheckpoint == maxSeqNoOfUpdatesOrDeletes + try { + concurrentDeleteOpLatch.await(); + assertThat(getProcessedLocalCheckpoint(), equalTo(getMaxSeqNoOfUpdatesOrDeletes())); + } catch (Exception exception) { + throw new RuntimeException(exception); + } + } + super.advanceMaxSeqNoOfUpdatesOnPrimary(seqNo); + } + }; + TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), followerConfig.getIndexSettings()); + followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE); + try { + final long leaderMaxSeqNoOfUpdatesOnPrimary = 3; + followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(leaderMaxSeqNoOfUpdatesOnPrimary); + + followingEngine.index(indexForFollowing("1", 0, Engine.Operation.Origin.PRIMARY, 1)); + followingEngine.delete(deleteForFollowing("1", 1, Engine.Operation.Origin.PRIMARY, 2)); + followingEngine.index(indexForFollowing("2", 2, Engine.Operation.Origin.PRIMARY, 1)); + assertThat(followingEngine.getProcessedLocalCheckpoint(), equalTo(2L)); + + CyclicBarrier barrier = new CyclicBarrier(3); + Thread thread1 = new Thread(() -> { + try { + barrier.await(); + followingEngine.delete(deleteForFollowing("2", 3, Engine.Operation.Origin.PRIMARY, 2)); + concurrentDeleteOpLatch.countDown(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Thread thread2 = new Thread(() -> { + try { + barrier.await(); + followingEngine.index(indexForFollowing("1", indexNewDocWithSameIdSeqNo, Engine.Operation.Origin.PRIMARY, 3)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + thread1.start(); + thread2.start(); + barrier.await(); + thread1.join(); + thread2.join(); + + assertThat(followingEngine.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leaderMaxSeqNoOfUpdatesOnPrimary)); + } finally { + followingEngine.close(); + } + } + } + private void runFollowTest(CheckedBiConsumer task) throws Exception { final CheckedBiConsumer wrappedTask = (leader, follower) -> { Thread[] threads = new Thread[between(1, 8)];