diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 38ba0257491f4..496e7fb8901f0 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -2410,9 +2410,7 @@ public Translog.Snapshot newChangesSnapshot(String source, MapperService mapperS long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { // TODO: Should we defer the refresh until we really need it? ensureOpen(); - if (lastRefreshedCheckpoint() < toSeqNo) { - refresh(source, SearcherScope.INTERNAL); - } + refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); try { LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot( @@ -2522,6 +2520,15 @@ final long lastRefreshedCheckpoint() { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } + /** + * Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint. + */ + protected final void refreshIfNeeded(String source, long requestingSeqNo) { + if (lastRefreshedCheckpoint() < requestingSeqNo) { + refresh(source, SearcherScope.INTERNAL); + } + } + private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { final AtomicLong refreshedCheckpoint; private long pendingCheckpoint; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index bb1efd6997393..3e563e6d5382e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -49,15 +49,20 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; @@ -65,6 +70,7 @@ import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; @@ -72,6 +78,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; +import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; @@ -307,6 +314,27 @@ protected static ParsedDocument testParsedDocument( mappingUpdate); } + public static CheckedFunction nestedParsedDocFactory() throws Exception { + final MapperService mapperService = createMapperService("type"); + final String nestedMapping = Strings.toString(XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("properties").startObject("nested_field").field("type", "nested").endObject().endObject() + .endObject().endObject()); + final DocumentMapper nestedMapper = mapperService.documentMapperParser().parse("type", new CompressedXContent(nestedMapping)); + return docId -> { + final XContentBuilder source = XContentFactory.jsonBuilder().startObject().field("field", "value"); + final int nestedValues = between(0, 3); + if (nestedValues > 0) { + XContentBuilder nestedField = source.startObject("nested_field"); + for (int i = 0; i < nestedValues; i++) { + nestedField.field("field-" + i, "value-" + i); + } + source.endObject(); + } + source.endObject(); + return nestedMapper.parse(SourceToParse.source("test", "type", docId, BytesReference.bytes(source), XContentType.JSON)); + }; + } + /** * Creates a tombstone document that only includes uid, seq#, term and version fields. */ diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 6d5df143eeae0..4a4b4648776b1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -31,7 +31,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.function.Function; public class TransportBulkShardOperationsAction extends TransportWriteAction { @@ -68,6 +67,41 @@ protected WritePrimaryResult rewriteWithTerm = operation -> { - final Translog.Operation operationWithPrimaryTerm; - switch (operation.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) operation; - operationWithPrimaryTerm = new Translog.Index( - index.type(), - index.id(), - index.seqNo(), - primary.getOperationPrimaryTerm(), - index.version(), - BytesReference.toBytes(index.source()), - index.routing(), - index.getAutoGeneratedIdTimestamp()); - break; - case DELETE: - final Translog.Delete delete = (Translog.Delete) operation; - operationWithPrimaryTerm = new Translog.Delete( - delete.type(), - delete.id(), - delete.uid(), - delete.seqNo(), - primary.getOperationPrimaryTerm(), - delete.version()); - break; - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) operation; - operationWithPrimaryTerm = new Translog.NoOp(noOp.seqNo(), primary.getOperationPrimaryTerm(), noOp.reason()); - break; - default: - throw new IllegalStateException("unexpected operation type [" + operation.opType() + "]"); - } - return operationWithPrimaryTerm; - }; - assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]"; primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes); final List appliedOperations = new ArrayList<>(sourceOperations.size()); Translog.Location location = null; - long waitingForGlobalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; for (Translog.Operation sourceOp : sourceOperations) { - final Translog.Operation targetOp = rewriteWithTerm.apply(sourceOp); + final Translog.Operation targetOp = rewriteOperationWithPrimaryTerm(sourceOp, primary.getOperationPrimaryTerm()); final Engine.Result result = primary.applyTranslogOperation(targetOp, Engine.Operation.Origin.PRIMARY); if (result.getResultType() == Engine.Result.Type.SUCCESS) { assert result.getSeqNo() == targetOp.seqNo(); @@ -131,23 +129,28 @@ public static CcrWritePrimaryResult shardOperationOnPrimary( location = locationToSync(location, result.getTranslogLocation()); } else { if (result.getFailure() instanceof AlreadyProcessedFollowingEngineException) { - // Skipped operations will be delivered to replicas via primary-replica resync or peer-recovery. - // The primary must not acknowledge this request until the global checkpoint is at least the highest - // seqno of all skipped operations (i.e., all skipped operations have been processed on every replica). - waitingForGlobalCheckpoint = SequenceNumbers.max(waitingForGlobalCheckpoint, targetOp.seqNo()); + // The existing operations below the global checkpoint won't be replicated as they were processed + // in every replicas already. However, the existing operations above the global checkpoint will be + // replicated to replicas but with the existing primary term (not the current primary term) in order + // to guarantee the consistency between the primary and replicas, and between translog and Lucene index. + final AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + assert failure.getSeqNo() == targetOp.seqNo() : targetOp.seqNo() + " != " + failure.getSeqNo(); + if (failure.getExistingPrimaryTerm().isPresent()) { + appliedOperations.add(rewriteOperationWithPrimaryTerm(sourceOp, failure.getExistingPrimaryTerm().getAsLong())); + } else if (targetOp.seqNo() > primary.getGlobalCheckpoint()) { + assert false : "can't find primary_term for existing op=" + targetOp + " gcp=" + primary.getGlobalCheckpoint(); + throw new IllegalStateException("can't find primary_term for existing op=" + targetOp + + " global_checkpoint=" + primary.getGlobalCheckpoint(), failure); + } } else { assert false : "Only already-processed error should happen; op=[" + targetOp + "] error=[" + result.getFailure() + "]"; throw ExceptionsHelper.convertToElastic(result.getFailure()); } } } - assert appliedOperations.size() == sourceOperations.size() || waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : - "waiting global checkpoint is not assigned; waiting_gcp=" + waitingForGlobalCheckpoint + - " source_ops=" + sourceOperations.size() + " applied_ops=" + sourceOperations.size(); - assert appliedOperations.size() == 0 || location != null; final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest( shardId, historyUUID, appliedOperations, maxSeqNoOfUpdatesOrDeletes); - return new CcrWritePrimaryResult(replicaRequest, location, primary, waitingForGlobalCheckpoint, logger); + return new CcrWritePrimaryResult(replicaRequest, location, primary, logger); } @Override @@ -184,12 +187,8 @@ protected BulkShardOperationsResponse newResponseInstance() { * Custom write result to include global checkpoint after ops have been replicated. */ static final class CcrWritePrimaryResult extends WritePrimaryResult { - final long waitingForGlobalCheckpoint; - - CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, - long waitingForGlobalCheckpoint, Logger logger) { + CcrWritePrimaryResult(BulkShardOperationsRequest request, Translog.Location location, IndexShard primary, Logger logger) { super(request, new BulkShardOperationsResponse(), location, null, primary, logger); - this.waitingForGlobalCheckpoint = waitingForGlobalCheckpoint; } @Override @@ -201,19 +200,7 @@ public synchronized void respond(ActionListener lis response.setMaxSeqNo(seqNoStats.getMaxSeqNo()); listener.onResponse(response); }, listener::onFailure); - - if (waitingForGlobalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO) { - primary.addGlobalCheckpointListener(waitingForGlobalCheckpoint, (gcp, e) -> { - if (e != null) { - listener.onFailure(e); - } else { - assert waitingForGlobalCheckpoint <= gcp : waitingForGlobalCheckpoint + " > " + gcp; - super.respond(wrappedListener); - } - }, null); - } else { - super.respond(wrappedListener); - } + super.respond(wrappedListener); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java index 9e19c93b2867a..3033ba31c8253 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/AlreadyProcessedFollowingEngineException.java @@ -9,8 +9,27 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.ShardId; +import java.util.OptionalLong; + +/** + * An exception represents that an operation was processed before on the {@link FollowingEngine} of the primary of a follower. + * The field {@code existingPrimaryTerm} is empty only if the operation is below the global checkpoint; otherwise it should be non-empty. + */ public final class AlreadyProcessedFollowingEngineException extends VersionConflictEngineException { - AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo) { - super(shardId, "operation [{}] was processed before", null, seqNo); + private final long seqNo; + private final OptionalLong existingPrimaryTerm; + + AlreadyProcessedFollowingEngineException(ShardId shardId, long seqNo, OptionalLong existingPrimaryTerm) { + super(shardId, "operation [{}] was processed before with term [{}]", null, seqNo, existingPrimaryTerm); + this.seqNo = seqNo; + this.existingPrimaryTerm = existingPrimaryTerm; + } + + public long getSeqNo() { + return seqNo; + } + + public OptionalLong getExistingPrimaryTerm() { + return existingPrimaryTerm; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java index 8a413ce498066..84aa141c80d1d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngine.java @@ -5,14 +5,28 @@ */ package org.elasticsearch.xpack.ccr.index.engine; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.ReaderUtil; +import org.apache.lucene.search.BooleanClause; +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.search.Query; +import org.apache.lucene.search.TopDocs; +import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.EngineConfig; import org.elasticsearch.index.engine.InternalEngine; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.xpack.ccr.CcrSettings; import java.io.IOException; +import java.util.OptionalLong; /** * An engine implementation for following shards. @@ -62,13 +76,13 @@ protected InternalEngine.IndexingStrategy indexingStrategyForOperation(final Ind /* * The existing operation in this engine was probably assigned the term of the previous primary shard which is different * from the term of the current operation. If the current operation arrives on replicas before the previous operation, - * then the Lucene content between the primary and replicas are not identical (primary terms are different). Since the - * existing operations are guaranteed to be replicated to replicas either via peer-recovery or primary-replica resync, - * we can safely skip this operation here and let the caller know the decision via AlreadyProcessedFollowingEngineException. - * The caller then waits for the global checkpoint to advance at least the seq_no of this operation to make sure that - * the existing operation was replicated to all replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). + * then the Lucene content between the primary and replicas are not identical (primary terms are different). We can safely + * skip the existing operations below the global checkpoint, however must replicate the ones above the global checkpoint + * but with the previous primary term (not the current term of the operation) in order to guarantee the consistency + * between the primary and replicas (see TransportBulkShardOperationsAction#shardOperationOnPrimary). */ - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, index.seqNo()); + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, index.seqNo(), lookupPrimaryTerm(index.seqNo())); return IndexingStrategy.skipDueToVersionConflict(error, false, index.version(), index.primaryTerm()); } else { return IndexingStrategy.processButSkipLucene(false, index.seqNo(), index.version()); @@ -88,7 +102,8 @@ protected InternalEngine.DeletionStrategy deletionStrategyForOperation(final Del preFlight(delete); if (delete.origin() == Operation.Origin.PRIMARY && hasBeenProcessedBefore(delete)) { // See the comment in #indexingStrategyForOperation for the explanation why we can safely skip this operation. - final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException(shardId, delete.seqNo()); + final AlreadyProcessedFollowingEngineException error = new AlreadyProcessedFollowingEngineException( + shardId, delete.seqNo(), lookupPrimaryTerm(delete.seqNo())); return DeletionStrategy.skipDueToVersionConflict(error, delete.version(), delete.primaryTerm(), false); } else { return planDeletionAsNonPrimary(delete); @@ -126,6 +141,46 @@ protected boolean assertPrimaryCanOptimizeAddDocument(final Index index) { return true; } + private OptionalLong lookupPrimaryTerm(final long seqNo) throws IOException { + refreshIfNeeded("lookup_primary_term", seqNo); + try (Searcher engineSearcher = acquireSearcher("lookup_primary_term", SearcherScope.INTERNAL)) { + // We have to acquire a searcher before execute this check to ensure that the requesting seq_no is always found in the else + // branch. If the operation is at most the global checkpoint, we should not look up its term as we may have merged away the + // operation. Moreover, we won't need to replicate this operation to replicas since it was processed on every copies already. + if (seqNo <= engineConfig.getGlobalCheckpointSupplier().getAsLong()) { + return OptionalLong.empty(); + } else { + final DirectoryReader reader = Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()); + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + final Query query = new BooleanQuery.Builder() + .add(LongPoint.newExactQuery(SeqNoFieldMapper.NAME, seqNo), BooleanClause.Occur.FILTER) + // excludes the non-root nested documents which don't have primary_term. + .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.FILTER) + .build(); + final TopDocs topDocs = searcher.search(query, 1); + if (topDocs.scoreDocs.length == 1) { + final int docId = topDocs.scoreDocs[0].doc; + final LeafReaderContext leaf = reader.leaves().get(ReaderUtil.subIndex(docId, reader.leaves())); + final NumericDocValues primaryTermDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME); + if (primaryTermDV != null && primaryTermDV.advanceExact(docId - leaf.docBase)) { + assert primaryTermDV.longValue() > 0 : "invalid term [" + primaryTermDV.longValue() + "]"; + return OptionalLong.of(primaryTermDV.longValue()); + } + } + assert false : "seq_no[" + seqNo + "] does not have primary_term, total_hits=[" + topDocs.totalHits + "]"; + throw new IllegalStateException("seq_no[" + seqNo + "] does not have primary_term (total_hits=" + topDocs.totalHits + ")"); + } + } catch (IOException e) { + try { + maybeFailEngine("lookup_primary_term", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + throw e; + } + } + /** * Returns the number of indexing operations that have been optimized (bypass version lookup) using sequence numbers in this engine. * This metric is not persisted, and started from 0 when the engine is opened. diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java index fc6f0563e961e..7a11b30a6f0f5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/IndexFollowingIT.java @@ -271,7 +271,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) assertMaxSeqNoOfUpdatesIsTransferred(resolveLeaderIndex("index1"), resolveFollowerIndex("index2"), numberOfShards); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFollowIndexAndCloseNode() throws Exception { getFollowerCluster().ensureAtLeastNumDataNodes(3); String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); @@ -619,7 +618,6 @@ public void testUnfollowIndex() throws Exception { assertThat(followerClient().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testFailOverOnFollower() throws Exception { int numberOfReplicas = between(1, 2); getFollowerCluster().startMasterOnlyNode(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 7b95252c866f3..6443973ff33e7 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -229,7 +229,6 @@ public void testChangeFollowerHistoryUUID() throws Exception { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/34412") public void testRetryBulkShardOperations() throws Exception { try (ReplicationGroup leaderGroup = createGroup(between(0, 1)); ReplicationGroup followerGroup = createFollowGroup(between(1, 3))) { @@ -306,7 +305,9 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { private ReplicationGroup createFollowGroup(int replicas) throws IOException { Settings.Builder settingsBuilder = Settings.builder(); - settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true); + settingsBuilder.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(between(1, 1000), ByteSizeUnit.KB)); return createGroup(replicas, settingsBuilder.build()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java index dfacb96c31cca..283cf6bf42c36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsTests.java @@ -7,18 +7,18 @@ package org.elasticsearch.xpack.ccr.action.bulk; import org.apache.lucene.index.Term; -import org.elasticsearch.ElasticsearchTimeoutException; -import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.replication.TransportWriteAction; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Randomness; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.translog.Translog; +import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.xpack.ccr.CcrSettings; import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory; @@ -29,6 +29,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.rewriteOperationWithPrimaryTerm; import static org.hamcrest.Matchers.equalTo; import static org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction.CcrWritePrimaryResult; @@ -87,60 +90,11 @@ public void testPrimaryTermFromFollower() throws IOException { closeShards(followerPrimary); } - public void testPrimaryResultWaitForGlobalCheckpoint() throws Exception { - final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); - final IndexShard shard = newStartedShard(false, settings, new FollowingEngineFactory()); - int numOps = between(1, 100); - for (int i = 0; i < numOps; i++) { - final String id = Integer.toString(i); - final Translog.Operation op; - if (randomBoolean()) { - op = new Translog.Index("_doc", id, i, primaryTerm, 0, SOURCE, null, -1); - } else if (randomBoolean()) { - shard.advanceMaxSeqNoOfUpdatesOrDeletes(i); - op = new Translog.Delete("_doc", id, new Term("_id", Uid.encodeId(id)), i, primaryTerm, 0); - } else { - op = new Translog.NoOp(i, primaryTerm, "test"); - } - shard.applyTranslogOperation(op, Engine.Operation.Origin.REPLICA); - } - BulkShardOperationsRequest request = new BulkShardOperationsRequest(); - { - PlainActionFuture listener = new PlainActionFuture<>(); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, -2, logger); - primaryResult.respond(listener); - assertThat("should return intermediately if waiting_global_checkpoint is not specified", listener.isDone(), equalTo(true)); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - } - { - PlainActionFuture listener = new PlainActionFuture<>(); - long waitingForGlobalCheckpoint = randomLongBetween(shard.getGlobalCheckpoint() + 1, shard.getLocalCheckpoint()); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); - primaryResult.respond(listener); - assertThat(listener.isDone(), equalTo(false)); - expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); - - shard.updateGlobalCheckpointOnReplica(randomLongBetween(shard.getGlobalCheckpoint(), waitingForGlobalCheckpoint - 1), "test"); - expectThrows(ElasticsearchTimeoutException.class, () -> listener.actionGet(TimeValue.timeValueMillis(1))); - - shard.updateGlobalCheckpointOnReplica(randomLongBetween(waitingForGlobalCheckpoint, shard.getLocalCheckpoint()), "test"); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); - } - { - PlainActionFuture listener = new PlainActionFuture<>(); - long waitingForGlobalCheckpoint = randomLongBetween(-1, shard.getGlobalCheckpoint()); - CcrWritePrimaryResult primaryResult = new CcrWritePrimaryResult(request, null, shard, waitingForGlobalCheckpoint, logger); - primaryResult.respond(listener); - assertThat(listener.get().getMaxSeqNo(), equalTo(shard.seqNoStats().getMaxSeqNo())); - assertThat(listener.get().getGlobalCheckpoint(), equalTo(shard.getGlobalCheckpoint())); - } - closeShards(shard); - } - public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { - final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true).build(); - final IndexShard primary = newStartedShard(true, settings, new FollowingEngineFactory()); + final Settings settings = Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); + final IndexShard oldPrimary = newStartedShard(true, settings, new FollowingEngineFactory()); + final long oldPrimaryTerm = oldPrimary.getOperationPrimaryTerm(); long seqno = 0; List firstBulk = new ArrayList<>(); List secondBulk = new ArrayList<>(); @@ -157,46 +111,41 @@ public void testPrimaryResultIncludeOnlyAppliedOperations() throws Exception { } else { secondBulk.add(op); } + if (rarely()) { + oldPrimary.refresh("test"); + } + if (rarely()) { + oldPrimary.flush(new FlushRequest()); + } } Randomness.shuffle(firstBulk); Randomness.shuffle(secondBulk); - primary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); - - final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), - primary.getHistoryUUID(), firstBulk, seqno, primary, logger); + oldPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + final CcrWritePrimaryResult fullResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(oldPrimary.shardId(), + oldPrimary.getHistoryUUID(), firstBulk, seqno, oldPrimary, logger); assertThat(fullResult.replicaRequest().getOperations(), - equalTo(rewriteWithPrimaryTerm(firstBulk, primary.getOperationPrimaryTerm()))); - assertThat(fullResult.waitingForGlobalCheckpoint, equalTo(-2L)); - - // This bulk includes some operations from the first bulk. These operations should not be included in the result. + equalTo(firstBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)).collect(Collectors.toList()))); + primaryTerm = randomLongBetween(primaryTerm, primaryTerm + 10); + final IndexShard newPrimary = reinitShard(oldPrimary); + DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + newPrimary.markAsRecovering("store", new RecoveryState(newPrimary.routingEntry(), localNode, null)); + assertTrue(newPrimary.recoverFromStore()); + IndexShardTestCase.updateRoutingEntry(newPrimary, newPrimary.routingEntry().moveToStarted()); + newPrimary.advanceMaxSeqNoOfUpdatesOrDeletes(seqno); + // The second bulk includes some operations from the first bulk which were processed already; + // only a subset of these operations will be included the result but with the old primary term. final List existingOps = randomSubsetOf(firstBulk); - final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), - primary.getHistoryUUID(), Stream.concat(existingOps.stream(), secondBulk.stream()).collect(Collectors.toList()), - seqno, primary, logger); - assertThat(partialResult.replicaRequest().getOperations(), - equalTo(rewriteWithPrimaryTerm(secondBulk, primary.getOperationPrimaryTerm()))); - assertThat(partialResult.waitingForGlobalCheckpoint, - equalTo(existingOps.stream().mapToLong(Translog.Operation::seqNo).max().orElse(-2L))); - - closeShards(primary); - } - - private List rewriteWithPrimaryTerm(List sourceOperations, long primaryTerm) { - return sourceOperations.stream().map(op -> { - switch (op.opType()) { - case INDEX: - final Translog.Index index = (Translog.Index) op; - return new Translog.Index(index.type(), index.id(), index.seqNo(), primaryTerm, - index.version(), BytesReference.toBytes(index.source()), index.routing(), index.getAutoGeneratedIdTimestamp()); - case DELETE: - final Translog.Delete delete = (Translog.Delete) op; - return new Translog.Delete(delete.type(), delete.id(), delete.uid(), delete.seqNo(), primaryTerm, delete.version()); - case NO_OP: - final Translog.NoOp noOp = (Translog.NoOp) op; - return new Translog.NoOp(noOp.seqNo(), primaryTerm, noOp.reason()); - default: - throw new IllegalStateException("unexpected operation type [" + op.opType() + "]"); - } - }).collect(Collectors.toList()); + final CcrWritePrimaryResult partialResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(newPrimary.shardId(), + newPrimary.getHistoryUUID(), Stream.concat(secondBulk.stream(), existingOps.stream()).collect(Collectors.toList()), + seqno, newPrimary, logger); + final long newPrimaryTerm = newPrimary.getOperationPrimaryTerm(); + final long globalCheckpoint = newPrimary.getGlobalCheckpoint(); + final List appliedOperations = Stream.concat( + secondBulk.stream().map(op -> rewriteOperationWithPrimaryTerm(op, newPrimaryTerm)), + existingOps.stream().filter(op -> op.seqNo() > globalCheckpoint).map(op -> rewriteOperationWithPrimaryTerm(op, oldPrimaryTerm)) + ).collect(Collectors.toList()); + + assertThat(partialResult.replicaRequest().getOperations(), equalTo(appliedOperations)); + closeShards(newPrimary); } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index ec59e4c5b1d31..5d27c786ad478 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -46,8 +47,10 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -67,6 +70,7 @@ public class FollowingEngineTests extends ESTestCase { private Index index; private ShardId shardId; private AtomicLong primaryTerm = new AtomicLong(); + private AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); public void setUp() throws Exception { super.setUp(); @@ -260,7 +264,7 @@ public void onFailedEngine(String reason, Exception e) { Collections.emptyList(), null, new NoneCircuitBreakerService(), - () -> SequenceNumbers.NO_OPS_PERFORMED, + globalCheckpoint::longValue, () -> primaryTerm.get(), EngineTestCase.tombstoneDocSupplier() ); @@ -555,13 +559,16 @@ public void close() throws IOException { public void testProcessOnceOnPrimary() throws Exception { final Settings settings = Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) - .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true).build(); + .put("index.version.created", Version.CURRENT).put("index.xpack.ccr.following_index", true) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true).build(); final IndexMetaData indexMetaData = IndexMetaData.builder(index.getName()).settings(settings).build(); final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings); + final CheckedFunction nestedDocFactory = EngineTestCase.nestedParsedDocFactory(); int numOps = between(10, 100); List operations = new ArrayList<>(numOps); for (int i = 0; i < numOps; i++) { - ParsedDocument doc = EngineTestCase.createParsedDoc(Integer.toString(between(1, 100)), null); + String docId = Integer.toString(between(1, 100)); + ParsedDocument doc = randomBoolean() ? EngineTestCase.createParsedDoc(docId, null) : nestedDocFactory.apply(docId); if (randomBoolean()) { operations.add(new Engine.Index(EngineTestCase.newUid(doc), doc, i, primaryTerm.get(), 1L, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, threadPool.relativeTimeInMillis(), -1, true)); @@ -571,24 +578,39 @@ public void testProcessOnceOnPrimary() throws Exception { } } Randomness.shuffle(operations); + final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + primaryTerm.set(oldTerm); try (Store store = createStore(shardId, indexSettings, newDirectory())) { final EngineConfig engineConfig = engineConfig(shardId, indexSettings, threadPool, store, logger, xContentRegistry()); try (FollowingEngine followingEngine = createEngine(store, engineConfig)) { followingEngine.advanceMaxSeqNoOfUpdatesOrDeletes(operations.size() - 1L); - final long oldTerm = randomLongBetween(1, Integer.MAX_VALUE); + final Map operationWithTerms = new HashMap<>(); for (Engine.Operation op : operations) { - Engine.Result result = applyOperation(followingEngine, op, oldTerm, randomFrom(Engine.Operation.Origin.values())); + long term = randomLongBetween(1, oldTerm); + Engine.Result result = applyOperation(followingEngine, op, term, randomFrom(Engine.Operation.Origin.values())); assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); + operationWithTerms.put(op.seqNo(), term); + if (rarely()) { + followingEngine.refresh("test"); + } } // Primary should reject duplicates + globalCheckpoint.set(randomLongBetween(globalCheckpoint.get(), followingEngine.getLocalCheckpoint())); final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); for (Engine.Operation op : operations) { Engine.Result result = applyOperation(followingEngine, op, newTerm, Engine.Operation.Origin.PRIMARY); assertThat(result.getResultType(), equalTo(Engine.Result.Type.FAILURE)); assertThat(result.getFailure(), instanceOf(AlreadyProcessedFollowingEngineException.class)); + AlreadyProcessedFollowingEngineException failure = (AlreadyProcessedFollowingEngineException) result.getFailure(); + if (op.seqNo() <= globalCheckpoint.get()) { + assertThat("should not look-up term for operations at most the global checkpoint", + failure.getExistingPrimaryTerm().isPresent(), equalTo(false)); + } else { + assertThat(failure.getExistingPrimaryTerm().getAsLong(), equalTo(operationWithTerms.get(op.seqNo()))); + } } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { - assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } // Replica should accept duplicates primaryTerm.set(newTerm); @@ -600,7 +622,7 @@ public void testProcessOnceOnPrimary() throws Exception { assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS)); } for (DocIdSeqNoAndTerm docId : getDocIds(followingEngine, true)) { - assertThat(docId.getPrimaryTerm(), equalTo(oldTerm)); + assertThat(docId.getPrimaryTerm(), equalTo(operationWithTerms.get(docId.getSeqNo()))); } } }