diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 090b5233fbe40..aff51bcb3f9b5 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -60,6 +60,7 @@ import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.IndexSettings; @@ -158,6 +159,7 @@ public class InternalEngine extends Engine { private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener; private final AtomicBoolean trackTranslogLocation = new AtomicBoolean(false); + private final KeyedLock noOpKeyedLock = new KeyedLock<>(); @Nullable private final String historyUUID; @@ -1487,32 +1489,42 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED; final long seqNo = noOp.seqNo(); - try { - Exception failure = null; - if (softDeleteEnabled) { - try { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); - tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); - // A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field. - // 1L is selected to optimize the compression because it might probably be the most common value in version field. - tombstone.version().setLongValue(1L); - assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]"; - final ParseContext.Document doc = tombstone.docs().get(0); - assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null - : "Noop tombstone document but _tombstone field is not set [" + doc + " ]"; - doc.add(softDeletesField); - indexWriter.addDocument(doc); - } catch (Exception ex) { - if (maybeFailEngine("noop", ex)) { - throw ex; + try (Releasable ignored = noOpKeyedLock.acquire(seqNo)) { + final NoOpResult noOpResult; + final Optional preFlightError = preFlightCheckForNoOp(noOp); + if (preFlightError.isPresent()) { + noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), preFlightError.get()); + } else { + Exception failure = null; + if (softDeleteEnabled) { + try { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(noOp.reason()); + tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); + // A noop tombstone does not require a _version but it's added to have a fully dense docvalues for the version field. + // 1L is selected to optimize the compression because it might probably be the most common value in version field. + tombstone.version().setLongValue(1L); + assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]"; + final ParseContext.Document doc = tombstone.docs().get(0); + assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null + : "Noop tombstone document but _tombstone field is not set [" + doc + " ]"; + doc.add(softDeletesField); + indexWriter.addDocument(doc); + } catch (Exception ex) { + if (maybeFailEngine("noop", ex)) { + throw ex; + } + failure = ex; } - failure = ex; } - } - final NoOpResult noOpResult = failure != null ? new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure) : new NoOpResult(getPrimaryTerm(), noOp.seqNo()); - if (noOp.origin().isFromTranslog() == false) { - final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); - noOpResult.setTranslogLocation(location); + if (failure == null) { + noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo()); + } else { + noOpResult = new NoOpResult(getPrimaryTerm(), noOp.seqNo(), failure); + } + if (noOp.origin().isFromTranslog() == false && noOpResult.getResultType() == Result.Type.SUCCESS) { + final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); + noOpResult.setTranslogLocation(location); + } } noOpResult.setTook(System.nanoTime() - noOp.startTime()); noOpResult.freeze(); @@ -1524,6 +1536,14 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { } } + /** + * Executes a pre-flight check for a given NoOp. + * If this method returns a non-empty result, the engine won't process this NoOp and returns a failure. + */ + protected Optional preFlightCheckForNoOp(final NoOp noOp) throws IOException { + return Optional.empty(); + } + @Override public void refresh(String source) throws EngineException { refresh(source, SearcherScope.EXTERNAL); @@ -2434,8 +2454,14 @@ public void waitForOpsToComplete(long seqNo) throws InterruptedException { * @return true if the given operation was processed; otherwise false. */ protected final boolean hasBeenProcessedBefore(Operation op) { - assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no"; - assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes()); + if (Assertions.ENABLED) { + assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "operation is not assigned seq_no"; + if (op.operationType() == Operation.TYPE.NO_OP) { + assert noOpKeyedLock.isHeldByCurrentThread(op.seqNo()); + } else { + assert versionMap.assertKeyedLockHeldByCurrentThread(op.uid().bytes()); + } + } return localCheckpointTracker.contains(op.seqNo()); } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 628ae1e5629db..e1a1047c13e60 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3009,6 +3009,7 @@ public long addDocument(Iterable doc) throws IOExcepti private void maybeThrowFailure() throws IOException { if (failureToThrow.get() != null) { Exception failure = failureToThrow.get().get(); + clearFailure(); // one shot if (failure instanceof RuntimeException) { throw (RuntimeException) failure; } else if (failure instanceof IOException) { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 60daa55507476..0caec5e97903f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -26,6 +26,7 @@ import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.Optional; import java.util.OptionalLong; /** @@ -109,9 +110,14 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del } @Override - public NoOpResult noOp(NoOp noOp) { - // TODO: Make sure we process NoOp once. - return super.noOp(noOp); + protected Optional preFlightCheckForNoOp(NoOp noOp) throws IOException { + if (noOp.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(noOp)) { + // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. + final OptionalLong existingTerm = lookupPrimaryTerm(noOp.seqNo()); + return Optional.of(new AlreadyProcessedFollowingEngineException(shardId, noOp.seqNo(), existingTerm)); + } else { + return super.preFlightCheckForNoOp(noOp); + } } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index ca19f5c6ff519..f760a420c71cd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.seqno.SeqNoStats; @@ -241,6 +242,14 @@ public void testRetryBulkShardOperations() throws Exception { followerGroup.startAll(); leaderGroup.appendDocs(between(10, 100)); leaderGroup.refresh("test"); + for (int numNoOps = between(1, 10), i = 0; i < numNoOps; i++) { + long seqNo = leaderGroup.getPrimary().seqNoStats().getMaxSeqNo() + 1; + Engine.NoOp noOp = new Engine.NoOp(seqNo, leaderGroup.getPrimary().getOperationPrimaryTerm(), + Engine.Operation.Origin.REPLICA, threadPool.relativeTimeInMillis(), "test-" + i); + for (IndexShard shard : leaderGroup) { + getEngine(shard).noOp(noOp); + } + } for (String deleteId : randomSubsetOf(IndexShardTestCase.getShardDocUIDs(leaderGroup.getPrimary()))) { BulkItemResponse resp = leaderGroup.delete(new DeleteRequest("test", "type", deleteId)); assertThat(resp.getFailure(), nullValue()); @@ -277,11 +286,14 @@ public void testRetryBulkShardOperations() throws Exception { SeqNoStats followerSeqNoStats = followerGroup.getPrimary().seqNoStats(); shardFollowTask.start(followerGroup.getPrimary().getHistoryUUID(), leadingPrimary.getGlobalCheckpoint(), leadingPrimary.getMaxSeqNoOfUpdatesOrDeletes(), followerSeqNoStats.getGlobalCheckpoint(), followerSeqNoStats.getMaxSeqNo()); - assertBusy(() -> { - assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); - assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); - }); - shardFollowTask.markAsCompleted(); + try { + assertBusy(() -> { + assertThat(followerGroup.getPrimary().getGlobalCheckpoint(), equalTo(leadingPrimary.getGlobalCheckpoint())); + assertConsistentHistoryBetweenLeaderAndFollower(leaderGroup, followerGroup); + }); + } finally { + shardFollowTask.markAsCompleted(); + } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index 5aa29c6bea873..03155af7c9e9e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -104,8 +104,10 @@ public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { final Translog.Operation op; if (randomBoolean()) { op = new Translog.Index("_doc", id, seqno++, primaryTerm, 0, VersionType.EXTERNAL, SOURCE, null, null, -1); - } else { + } else if(randomBoolean()) { op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), seqno++, primaryTerm, 0, VersionType.EXTERNAL); + } else { + op = new Translog.NoOp(seqno++, primaryTerm, "test-" + i); } if (randomBoolean()) { firstBulk.add(op); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 17128269f815d..c53b2b8c75d2a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -306,16 +306,15 @@ private Engine.Delete deleteForPrimary(String id) { private Engine.Result applyOperation(Engine engine, Engine.Operation op, long primaryTerm, Engine.Operation.Origin origin) throws IOException { - final VersionType versionType = origin == Engine.Operation.Origin.PRIMARY ? op.versionType() : VersionType.EXTERNAL; final Engine.Result result; if (op instanceof Engine.Index) { Engine.Index index = (Engine.Index) op; result = engine.index(new Engine.Index(index.uid(), index.parsedDoc(), index.seqNo(), primaryTerm, index.version(), - versionType, origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry())); + index.versionType(), origin, index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry())); } else if (op instanceof Engine.Delete) { Engine.Delete delete = (Engine.Delete) op; result = engine.delete(new Engine.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, - delete.version(), versionType, origin, delete.startTime())); + delete.version(), delete.versionType(), origin, delete.startTime())); } else { Engine.NoOp noOp = (Engine.NoOp) op; result = engine.noOp(new Engine.NoOp(noOp.seqNo(), primaryTerm, origin, noOp.startTime(), noOp.reason())); @@ -573,9 +572,12 @@ public void testProcessOnceOnPrimary() throws Exception { if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); - } else { + } else if (randomBoolean()) { operations.add(new Engine.Delete(doc.type(), doc.id(), EngineTestCase.newUid(doc), i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis())); + } else { + operations.add(new Engine.NoOp(i, primaryTerm.get(), Engine.Operation.Origin.PRIMARY, + threadPool.relativeTimeInMillis(), "test-" + i)); } } Randomness.shuffle(operations);