Skip to content

Commit

Permalink
Fix FollowingEngineTests#testOptimizeMultipleVersions (elastic#77170)
Browse files Browse the repository at this point in the history
In certain concurrent indexing scenarios where there are deletes
executed and then a new indexing operation, the following engine
considers those as updates breaking one of the assumed invariants.

Closes elastic#72527
Backport of elastic#75583
  • Loading branch information
fcofdez authored Sep 2, 2021
1 parent 7ba942d commit 71902d8
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,11 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) {
return doGenerateSeqNoForOperation(operation);
}

protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
}

protected void advanceMaxSeqNoOfDeletesOnPrimary(long seqNo) {
advanceMaxSeqNoOfUpdatesOrDeletes(seqNo);
}

Expand Down Expand Up @@ -948,7 +952,7 @@ public IndexResult index(Index index) throws IOException {

final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
if (toAppend == false) {
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
advanceMaxSeqNoOfUpdatesOnPrimary(index.seqNo());
}
} else {
markSeqNoAsSeen(index.seqNo());
Expand Down Expand Up @@ -1332,7 +1336,7 @@ public DeleteResult delete(Delete delete) throws IOException {
delete.primaryTerm(), delete.version(), delete.versionType(), delete.origin(), delete.startTime(),
delete.getIfSeqNo(), delete.getIfPrimaryTerm());

advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(delete.seqNo());
advanceMaxSeqNoOfDeletesOnPrimary(delete.seqNo());
} else {
markSeqNoAsSeen(delete.seqNo());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
/**
* An engine implementation for following shards.
*/
public final class FollowingEngine extends InternalEngine {
public class FollowingEngine extends InternalEngine {


/**
Expand Down Expand Up @@ -118,14 +118,27 @@ protected long generateSeqNoForOperationOnPrimary(final Operation operation) {
}

@Override
protected void advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(long seqNo) {
protected void advanceMaxSeqNoOfDeletesOnPrimary(long seqNo) {
if (Assertions.ENABLED) {
final long localCheckpoint = getProcessedLocalCheckpoint();
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
assert localCheckpoint < maxSeqNoOfUpdates || maxSeqNoOfUpdates >= seqNo :
"maxSeqNoOfUpdates is not advanced local_checkpoint=" + localCheckpoint + " msu=" + maxSeqNoOfUpdates + " seq_no=" + seqNo;
}
super.advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(seqNo); // extra safe in production code

super.advanceMaxSeqNoOfDeletesOnPrimary(seqNo);
}

@Override
protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) {
// In some scenarios it is possible to advance maxSeqNoOfUpdatesOrDeletes over the leader
// maxSeqNoOfUpdatesOrDeletes, since in this engine (effectively it is a replica) we don't check if the previous version
// was a delete and it's possible to consider it as an update, advancing the max sequence number over the leader
// maxSeqNoOfUpdatesOrDeletes.
// We conservatively advance the seqno in this case, accepting a minor performance hit in this edge case.

// See FollowingEngineTests#testConcurrentUpdateOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates or #72527 for more details.
super.advanceMaxSeqNoOfUpdatesOnPrimary(seqNo);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
Expand All @@ -32,8 +32,10 @@
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.TranslogHandler;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.RetentionLeases;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -56,6 +58,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -294,12 +297,34 @@ private FollowingEngine createEngine(Store store, EngineConfig config) throws IO

private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin) {
final long version = randomBoolean() ? 1 : randomNonNegativeLong();
return indexForFollowing(id, seqNo, origin, version);
}

private Engine.Index indexForFollowing(String id, long seqNo, Engine.Operation.Origin origin, long version) {
final ParsedDocument parsedDocument = EngineTestCase.createParsedDoc(id, null);
return new Engine.Index(EngineTestCase.newUid(parsedDocument), parsedDocument, seqNo, primaryTerm.get(), version,
VersionType.EXTERNAL, origin, System.currentTimeMillis(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, randomBoolean(),
SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
}

private Engine.Delete deleteForFollowing(String id, long seqNo, Engine.Operation.Origin origin, long version) {
final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null);
long startTime = System.nanoTime();
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
return new Engine.Delete(parsedDoc.type(),
parsedDoc.id(),
uid,
seqNo,
primaryTerm.get(),
version,
VersionType.EXTERNAL,
origin,
startTime,
SequenceNumbers.UNASSIGNED_SEQ_NO,
SequenceNumbers.UNASSIGNED_PRIMARY_TERM
);
}

private Engine.Index indexForPrimary(String id) {
final ParsedDocument parsedDoc = EngineTestCase.createParsedDoc(id, null);
return new Engine.Index(EngineTestCase.newUid(parsedDoc), primaryTerm.get(), parsedDoc);
Expand Down Expand Up @@ -453,6 +478,87 @@ public void testOptimizeSingleDocConcurrently() throws Exception {
});
}

public void testConcurrentIndexOperationsWithDeletesCanAdvanceMaxSeqNoOfUpdates() throws Exception {
// See #72527 for more details
Settings followerSettings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
.put("index.version.created", Version.CURRENT)
.put("index.xpack.ccr.following_index", true)
.build();

IndexMetadata followerIndexMetadata = IndexMetadata.builder(index.getName()).settings(followerSettings).build();
IndexSettings followerIndexSettings = new IndexSettings(followerIndexMetadata, Settings.EMPTY);
try (Store followerStore = createStore(shardId, followerIndexSettings, newDirectory())) {
EngineConfig followerConfig =
engineConfig(shardId, followerIndexSettings, threadPool, followerStore, logger, xContentRegistry());
followerStore.createEmpty();
String translogUuid = Translog.createEmptyTranslog(followerConfig.getTranslogConfig().getTranslogPath(),
SequenceNumbers.NO_OPS_PERFORMED,
shardId,
1L
);
followerStore.associateIndexWithNewTranslog(translogUuid);
CountDownLatch concurrentDeleteOpLatch = new CountDownLatch(1);
final long indexNewDocWithSameIdSeqNo = 4;
FollowingEngine followingEngine = new FollowingEngine(followerConfig) {
@Override
protected void advanceMaxSeqNoOfUpdatesOnPrimary(long seqNo) {
if (seqNo == indexNewDocWithSameIdSeqNo) {
// wait until the concurrent delete finishes meaning that processedLocalCheckpoint == maxSeqNoOfUpdatesOrDeletes
try {
concurrentDeleteOpLatch.await();
assertThat(getProcessedLocalCheckpoint(), equalTo(getMaxSeqNoOfUpdatesOrDeletes()));
} catch (Exception exception) {
throw new RuntimeException(exception);
}
}
super.advanceMaxSeqNoOfUpdatesOnPrimary(seqNo);
}
};
TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), followerConfig.getIndexSettings());
followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
try {
final long leaderMaxSeqNoOfUpdatesOnPrimary = 3;
followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(leaderMaxSeqNoOfUpdatesOnPrimary);

followingEngine.index(indexForFollowing("1", 0, Engine.Operation.Origin.PRIMARY, 1));
followingEngine.delete(deleteForFollowing("1", 1, Engine.Operation.Origin.PRIMARY, 2));
followingEngine.index(indexForFollowing("2", 2, Engine.Operation.Origin.PRIMARY, 1));
assertThat(followingEngine.getProcessedLocalCheckpoint(), equalTo(2L));

CyclicBarrier barrier = new CyclicBarrier(3);
Thread thread1 = new Thread(() -> {
try {
barrier.await();
followingEngine.delete(deleteForFollowing("2", 3, Engine.Operation.Origin.PRIMARY, 2));
concurrentDeleteOpLatch.countDown();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
Thread thread2 = new Thread(() -> {
try {
barrier.await();
followingEngine.index(indexForFollowing("1", indexNewDocWithSameIdSeqNo, Engine.Operation.Origin.PRIMARY, 3));
} catch (Exception e) {
throw new RuntimeException(e);
}
});

thread1.start();
thread2.start();
barrier.await();
thread1.join();
thread2.join();

assertThat(followingEngine.getMaxSeqNoOfUpdatesOrDeletes(), greaterThanOrEqualTo(leaderMaxSeqNoOfUpdatesOnPrimary));
} finally {
followingEngine.close();
}
}
}

private void runFollowTest(CheckedBiConsumer<InternalEngine, FollowingEngine, Exception> task) throws Exception {
final CheckedBiConsumer<InternalEngine, FollowingEngine, Exception> wrappedTask = (leader, follower) -> {
Thread[] threads = new Thread[between(1, 8)];
Expand Down

0 comments on commit 71902d8

Please sign in to comment.