Skip to content

Commit

Permalink
CCR: Following primary should process NoOps once (#34408)
Browse files Browse the repository at this point in the history
This is a follow-up for #34288.

Relates #34412
  • Loading branch information
dnhatn authored Oct 20, 2018
1 parent ba87c54 commit d90b673
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -155,6 +156,7 @@ public class InternalEngine extends Engine {
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();

@Nullable
private final String historyUUID;
Expand Down Expand Up @@ -1407,32 +1409,42 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
// A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field.
// 1L is selected to optimize the compression because it might probably be the most common value in version field.
tombstone.version().setLongValue(1L);
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeletesField);
indexWriter.addDocument(doc);
} catch (Exception ex) {
if (maybeFailEngine("noop", ex)) {
throw ex;
try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) {
final NoOpResult noOpResult;
final Optional<Exception> preFlightError = preFlightCheckForNoOp(noOp);
if (preFlightError.isPresent()) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get());
} else {
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
// A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field.
// 1L is selected to optimize the compression because it might probably be the most common value in version field.
tombstone.version().setLongValue(1L);
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeletesField);
indexWriter.addDocument(doc);
} catch (Exception ex) {
if (maybeFailEngine("noop", ex)) {
throw ex;
}
failure = ex;
}
failure = ex;
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
if (failure == null) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
} else {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure);
}
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
}
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
Expand All @@ -1444,6 +1456,14 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
}

/**
* Executes a pre-flight check for a given NoOp.
* If this method returns a non-empty result, the engine won't process this NoOp and returns a failure.
*/
protected Optional<Exception> preFlightCheckForNoOp(final NoOp noOp) throws IOException {
return Optional.empty();
}

@Override
public void refresh(String source) throws EngineException {
refresh(source, SearcherScope.EXTERNAL);
Expand Down Expand Up @@ -2354,8 +2374,14 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
* @return true if the given operation was processed; otherwise false.
*/
protected final boolean hasBeenProcessedBefore(Operation op) {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
if (Assertions.ENABLED) {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
if (op.operationType() == Operation.TYPE.NO_OP) {
assert noOpKeyedLock.isHeldByCurrentThread(op.seqNo());
} else {
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
}
}
return localCheckpointTracker.contains(op.seqNo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2978,6 +2978,7 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
private void maybeThrowFailure() throws IOException {
if (failureToThrow.get() != null) {
Exception failure = failureToThrow.get().get();
clearFailure(); // one shot
if (failure instanceof RuntimeException) {
throw (RuntimeException) failure;
} else if (failure instanceof IOException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;

/**
Expand Down Expand Up @@ -111,9 +112,14 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del
}

@Override
public NoOpResult noOp(NoOp noOp) {
// TODO: Make sure we process NoOp once.
return super.noOp(noOp);
protected Optional<Exception> preFlightCheckForNoOp(NoOp noOp) throws IOException {
if (noOp.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(noOp)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final OptionalLong existingTerm = lookupPrimaryTerm(noOp.seqNo());
return Optional.of(new AlreadyProcessedFollowingEngineException(shardId, noOp.seqNo(), existingTerm));
} else {
return super.preFlightCheckForNoOp(noOp);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -240,6 +241,14 @@ public void testRetryBulkShardOperations() throws Exception {
followerGroup.startAll();
leaderGroup.appendDocs(between(10, 100));
leaderGroup.refresh("test");
for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) {
long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1;
Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(),
Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i);
for (IndexShard shard : leaderGroup) {
getEngine(shard).noOp(noOp);
}
}
for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId));
assertThat(resp.getFailure(), nullValue());
Expand Down Expand Up @@ -276,11 +285,14 @@ public void testRetryBulkShardOperations() throws Exception {
SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(),
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
});
shardFollowTask.markAsCompleted();
try {
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
});
} finally {
shardFollowTask.markAsCompleted();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception {
final Translog.Operation op;
if (randomBoolean()) {
op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, SOURCE, null, -1);
} else {
} else if (randomBoolean()) {
op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0);
} else {
op = new Translog.NoOp(seqno++, primaryTerm, "test-" + i);
}
if (randomBoolean()) {
firstBulk.add(op);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private Engine.Delete deleteForPrimary(String id) {

private Engine.Result applyOperation(Engine engine, Engine.Operation op,
long primaryTerm, Engine.Operation.Origin origin) throws IOException {
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : null;
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? VersionType.EXTERNAL : null;
final Engine.Result result;
if (op instanceof Engine.Index) {
Engine.Index index = (Engine.Index) op;
Expand Down Expand Up @@ -572,9 +572,12 @@ public void testProcessOnceOnPrimary() throws Exception {
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
} else {
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY,
threadPool.relativeTimeInMillis(), "test-" + i));
}
}
Randomness.shuffle(operations);
Expand Down

0 comments on commit d90b673

Please sign in to comment.