Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCR: Following primary should process NoOps once #34408

Merged
merged 5 commits into from
Oct 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
Expand Down Expand Up @@ -155,6 +156,7 @@ public class InternalEngine extends Engine {
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;

private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false);
private final KeyedLock<Long> noOpKeyedLock = new KeyedLock<>();

@Nullable
private final String historyUUID;
Expand Down Expand Up @@ -1407,32 +1409,42 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
// A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field.
// 1L is selected to optimize the compression because it might probably be the most common value in version field.
tombstone.version().setLongValue(1L);
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeletesField);
indexWriter.addDocument(doc);
} catch (Exception ex) {
if (maybeFailEngine("noop", ex)) {
throw ex;
try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) {
final NoOpResult noOpResult;
final Optional<Exception> preFlightError = preFlightCheckForNoOp(noOp);
if (preFlightError.isPresent()) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get());
} else {
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason());
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
// A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field.
// 1L is selected to optimize the compression because it might probably be the most common value in version field.
tombstone.version().setLongValue(1L);
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeletesField);
indexWriter.addDocument(doc);
} catch (Exception ex) {
if (maybeFailEngine("noop", ex)) {
throw ex;
}
failure = ex;
}
failure = ex;
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo());
if (noOp.origin().isFromTranslog() == false) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
if (failure == null) {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo());
} else {
noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure);
}
if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
}
}
noOpResult.setTook(System.nanoTime() - noOp.startTime());
noOpResult.freeze();
Expand All @@ -1444,6 +1456,14 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
}
}

/**
* Executes a pre-flight check for a given NoOp.
* If this method returns a non-empty result, the engine won't process this NoOp and returns a failure.
*/
protected Optional<Exception> preFlightCheckForNoOp(final NoOp noOp) throws IOException {
return Optional.empty();
}

@Override
public void refresh(String source) throws EngineException {
refresh(source, SearcherScope.EXTERNAL);
Expand Down Expand Up @@ -2354,8 +2374,14 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException {
* @return true if the given operation was processed; otherwise false.
*/
protected final boolean hasBeenProcessedBefore(Operation op) {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
if (Assertions.ENABLED) {
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no";
if (op.operationType() == Operation.TYPE.NO_OP) {
assert noOpKeyedLock.isHeldByCurrentThread(op.seqNo());
} else {
assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes());
}
}
return localCheckpointTracker.contains(op.seqNo());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2978,6 +2978,7 @@ public long addDocument(Iterable<? extends IndexableField> doc) throws IOExcepti
private void maybeThrowFailure() throws IOException {
if (failureToThrow.get() != null) {
Exception failure = failureToThrow.get().get();
clearFailure(); // one shot
if (failure instanceof RuntimeException) {
throw (RuntimeException) failure;
} else if (failure instanceof IOException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;

/**
Expand Down Expand Up @@ -111,9 +112,14 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del
}

@Override
public NoOpResult noOp(NoOp noOp) {
// TODO: Make sure we process NoOp once.
return super.noOp(noOp);
protected Optional<Exception> preFlightCheckForNoOp(NoOp noOp) throws IOException {
if (noOp.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(noOp)) {
// See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation.
final OptionalLong existingTerm = lookupPrimaryTerm(noOp.seqNo());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

random thought - I wonder if we should load the operation under assertion code and check it's the same (this goes for all duplicate ops).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the existing operation should equal the processing operation except for the primary term?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++. I'll make it in a follow-up after this PR.

return Optional.of(new AlreadyProcessedFollowingEngineException(shardId, noOp.seqNo(), existingTerm));
} else {
return super.preFlightCheckForNoOp(noOp);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase;
import org.elasticsearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -240,6 +241,14 @@ public void testRetryBulkShardOperations() throws Exception {
followerGroup.startAll();
leaderGroup.appendDocs(between(10, 100));
leaderGroup.refresh("test");
for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) {
long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1;
Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(),
Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i);
for (IndexShard shard : leaderGroup) {
getEngine(shard).noOp(noOp);
}
}
for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) {
BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId));
assertThat(resp.getFailure(), nullValue());
Expand Down Expand Up @@ -276,11 +285,14 @@ public void testRetryBulkShardOperations() throws Exception {
SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats();
shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(),
leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo());
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
});
shardFollowTask.markAsCompleted();
try {
assertBusy(() -> {
assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint()));
assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup);
});
} finally {
shardFollowTask.markAsCompleted();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@ public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception {
final Translog.Operation op;
if (randomBoolean()) {
op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, SOURCE, null, -1);
} else {
} else if (randomBoolean()) {
op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0);
} else {
op = new Translog.NoOp(seqno++, primaryTerm, "test-" + i);
}
if (randomBoolean()) {
firstBulk.add(op);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private Engine.Delete deleteForPrimary(String id) {

private Engine.Result applyOperation(Engine engine, Engine.Operation op,
long primaryTerm, Engine.Operation.Origin origin) throws IOException {
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : null;
final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? VersionType.EXTERNAL : null;
final Engine.Result result;
if (op instanceof Engine.Index) {
Engine.Index index = (Engine.Index) op;
Expand Down Expand Up @@ -572,9 +572,12 @@ public void testProcessOnceOnPrimary() throws Exception {
if (randomBoolean()) {
operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true));
} else {
} else if (randomBoolean()) {
operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L,
VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis()));
} else {
operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY,
threadPool.relativeTimeInMillis(), "test-" + i));
}
}
Randomness.shuffle(operations);
Expand Down