Skip to content

Commit

Permalink
Do not add multiple copies of stale docs to Lucene
Browse files Browse the repository at this point in the history
Since elastic#29679 we started adding stale operations to Lucene to have a complete history in Lucene. As the stale docs are rare, we accepted to have duplicate copies of them to keep an engine simple.

However, we now need to make sure that we have a single copy per stale operation in Lucene because the Lucene rollback requires a single document for each sequence number.
  • Loading branch information
dnhatn committed Jul 4, 2018
1 parent f736205 commit b0f3014
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,8 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version());
boolean addStaleOpToLucene = softDeleteEnabled && localCheckpointTracker.isProcessed(index.seqNo()) == false;
plan = IndexingStrategy.processAsStaleOp(addStaleOpToLucene, index.seqNo(), index.version());
} else {
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
index.seqNo(), index.version());
Expand Down Expand Up @@ -1258,7 +1259,8 @@ protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws
} else {
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(delete);
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version());
boolean addStaleOpToLucene = softDeleteEnabled && localCheckpointTracker.isProcessed(delete.seqNo()) == false;
plan = DeletionStrategy.processAsStaleOp(addStaleOpToLucene, false, delete.seqNo(), delete.version());
} else {
plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND,
delete.seqNo(), delete.version());
Expand Down Expand Up @@ -1400,7 +1402,9 @@ public void maybePruneDeletes() {
@Override
public NoOpResult noOp(final NoOp noOp) {
NoOpResult noOpResult;
try (ReleasableLock ignored = readLock.acquire()) {
try (ReleasableLock ignored = readLock.acquire();
// prevent two noOps with same seqno get in at the same time
Releasable uidLock = versionMap.acquireLock(new BytesRef(Long.toString(noOp.seqNo())))) {
noOpResult = innerNoOp(noOp);
} catch (final Exception e) {
noOpResult = new NoOpResult(noOp.seqNo(), e);
Expand All @@ -1414,7 +1418,7 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
final long seqNo = noOp.seqNo();
try {
Exception failure = null;
if (softDeleteEnabled) {
if (softDeleteEnabled && localCheckpointTracker.isProcessed(noOp.seqNo()) == false) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ public synchronized void markSeqNoAsCompleted(final long seqNo) {
}
}

/**
* Checks if the given sequence number has been processed (and tracked) in this tracker.
*/
public synchronized boolean isProcessed(long seqNo) {
if (seqNo <= checkpoint) {
return true;
}
if (seqNo >= nextSeqNo) {
return false;
}
final long bitSetKey = getBitSetKey(seqNo);
final CountedBitSet bitSet = processedSeqNo.get(bitSetKey);
return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo));
}

/**
* Resets the checkpoint to the specified value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import java.util.Base64;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -4947,6 +4948,44 @@ public void testKeepMinRetainedSeqNoByMergePolicy() throws IOException {
}
}

public void testDoNotIndexDuplicateStaleDocsToLucene() throws Exception {
int numOps = scaledRandomIntBetween(10, 200);
List<Engine.Operation> ops = new ArrayList<>();
Map<String, Long> versions = new HashMap<>();
for (int seqNo = 0; seqNo < numOps; seqNo++) {
String id = Integer.toString(randomIntBetween(1, 5));
long version = versions.compute(id, (k, v) -> (v == null ? 1 : v) + between(1, 10));
int copies = between(1, 3);
if (randomBoolean()) {
for (int i = 0; i < copies; i++) {
ops.add(replicaIndexForDoc(createParsedDoc(id, null), version, seqNo, randomBoolean()));
}
} else if (frequently()) {
for (int i = 0; i < copies; i++) {
ops.add(replicaDeleteForDoc(id, version, seqNo, randomNonNegativeLong()));
}
} else {
for (int i = 0; i < copies; i++) {
ops.add(new Engine.NoOp(seqNo, primaryTerm.get(), REPLICA, randomNonNegativeLong(), "test-" + seqNo));
}
}
}
Settings settings = Settings.builder().put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build();
IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
try (Store store = createStore();
InternalEngine engine = createEngine(
config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(), newMergePolicy(), null))) {
Randomness.shuffle(ops);
concurrentlyApplyOps(ops, engine);
engine.refresh("test", Engine.SearcherScope.INTERNAL);
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertThat(searcher.reader().maxDoc(), equalTo(numOps));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test"));
}
}

private static void trimUnsafeCommits(EngineConfig config) throws IOException {
final Store store = config.getStore();
final TranslogConfig translogConfig = config.getTranslogConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,20 @@ public void testSimplePrimary() {

public void testSimpleReplica() {
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
assertThat(tracker.isProcessed(randomNonNegativeLong()), equalTo(false));
tracker.markSeqNoAsCompleted(0L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.isProcessed(0L), equalTo(true));
assertThat(tracker.isProcessed(between(1, Integer.MAX_VALUE)), equalTo(false));
tracker.markSeqNoAsCompleted(2L);
assertThat(tracker.getCheckpoint(), equalTo(0L));
assertThat(tracker.isProcessed(1L), equalTo(false));
assertThat(tracker.isProcessed(2L), equalTo(true));
assertThat(tracker.isProcessed(between(3, Integer.MAX_VALUE)), equalTo(false));
tracker.markSeqNoAsCompleted(1L);
assertThat(tracker.getCheckpoint(), equalTo(2L));
assertThat(tracker.isProcessed(between(0, 2)), equalTo(true));
assertThat(tracker.isProcessed(between(3, Integer.MAX_VALUE)), equalTo(false));
}

public void testLazyInitialization() {
Expand Down Expand Up @@ -199,9 +207,14 @@ protected void doRun() throws Exception {
}
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
assertThat(tracker.isProcessed(randomValueOtherThan((int) unFinishedSeq, () -> randomFrom(seqNos))), equalTo(true));
assertThat(tracker.isProcessed(unFinishedSeq), equalTo(false));
assertThat(tracker.isProcessed(between(maxOps, Integer.MAX_VALUE)), equalTo(false));
tracker.markSeqNoAsCompleted(unFinishedSeq);
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
assertThat(tracker.isProcessed(randomFrom(seqNos)), equalTo(true));
assertThat(tracker.isProcessed(between(maxOps, Integer.MAX_VALUE)), equalTo(false));
if (tracker.processedSeqNo.size() == 1) {
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
}
Expand Down

0 comments on commit b0f3014

Please sign in to comment.