diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index 4899e65f60694..ad6ff12c1d1e9 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -372,9 +372,16 @@ public LongSupplier getPrimaryTermSupplier() { * A supplier supplies tombstone documents which will be used in soft-update methods. * The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded. */ - @FunctionalInterface public interface TombstoneDocSupplier { - ParsedDocument newTombstoneDoc(String type, String id); + /** + * Creates a tombstone document for a delete operation. + */ + ParsedDocument newDeleteTombstoneDoc(String type, String id); + + /** + * Creates a tombstone document for a noop operation. + */ + ParsedDocument newNoopTombstoneDoc(); } public TombstoneDocSupplier getTombstoneDocSupplier() { diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 88f271914239a..5c9ba8429a1fb 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -784,7 +784,9 @@ public IndexResult index(Index index) throws IOException { location = translog.add(new Translog.Index(index, indexResult)); } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { // if we have document failure, record it as a no-op in the translog with the generated seq_no - location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(), + index.startTime(), indexResult.getFailure().getMessage()); + location = innerNoOp(noOp).getTranslogLocation(); } else { location = null; } @@ -1226,7 +1228,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException { try { if (softDeleteEnabled) { - final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id()); + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id()); assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]"; tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm()); tombstone.version().setLongValue(plan.versionOfDeletion); @@ -1334,7 +1336,21 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException { assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED; final long seqNo = noOp.seqNo(); try { - final NoOpResult noOpResult = new NoOpResult(noOp.seqNo()); + Exception failure = null; + if (softDeleteEnabled) { + try { + final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc(); + tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm()); + assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]"; + addStaleDocs(tombstone.docs(), indexWriter); + } catch (Exception ex) { + if (indexWriter.getTragicException() != null) { + throw ex; + } + failure = ex; + } + } + final NoOpResult noOpResult = new NoOpResult(noOp.seqNo(), failure); if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason())); noOpResult.setTranslogLocation(location); diff --git a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index 7a4672f68e66a..3dea87b2112a7 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -124,7 +124,6 @@ public DocumentMapper build(MapperService mapperService) { private final Map objectMappers; private final boolean hasNestedObjects; - private final MetadataFieldMapper[] tombstoneMetadataFieldMappers; public DocumentMapper(MapperService mapperService, Mapping mapping) { this.mapperService = mapperService; @@ -133,10 +132,6 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) { final IndexSettings indexSettings = mapperService.getIndexSettings(); this.mapping = mapping; this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this); - final Collection tombstoneFields = - Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME); - this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers) - .filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new); // collect all the mappers for this type List newObjectMappers = new ArrayList<>(); @@ -251,9 +246,23 @@ public ParsedDocument parse(SourceToParse source) throws MapperParsingException return documentParser.parseDocument(source, mapping.metadataMappers); } - public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException { + public ParsedDocument createDeleteTombstoneDoc(String index, String type, String id) throws MapperParsingException { final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON); - return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers); + final Collection deleteFields = Arrays.asList(VersionFieldMapper.NAME, IdFieldMapper.NAME, TypeFieldMapper.NAME, + SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME); + final MetadataFieldMapper[] deleteFieldMappers = Stream.of(mapping.metadataMappers) + .filter(field -> deleteFields.contains(field.name())).toArray(MetadataFieldMapper[]::new); + return documentParser.parseDocument(emptySource, deleteFieldMappers).toTombstone(); + } + + public ParsedDocument createNoopTombstoneDoc(String index) throws MapperParsingException { + final String id = ""; // _id won't be used. + final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON); + final Collection noopFields = + Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME); + final MetadataFieldMapper[] noopFieldMappers = Stream.of(mapping.metadataMappers) + .filter(field -> noopFields.contains(field.name())).toArray(MetadataFieldMapper[]::new); + return documentParser.parseDocument(emptySource, noopFieldMappers).toTombstone(); } /** diff --git a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index 414cb3a98ecab..7e16aa4fb5f94 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -83,6 +83,13 @@ public void updateSeqID(long sequenceNumber, long primaryTerm) { this.seqID.primaryTerm.setLongValue(primaryTerm); } + ParsedDocument toTombstone() { + assert docs().size() == 1 : "Tombstone should have a single doc [" + docs() + "]"; + this.seqID.tombstoneField.setLongValue(1); + rootDoc().add(this.seqID.tombstoneField); + return this; + } + public String routing() { return this.routing; } diff --git a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index ac3ffe4627238..5a0db4163bf28 100644 --- a/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java +++ b/server/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -69,26 +69,29 @@ public static class SequenceIDFields { public final Field seqNo; public final Field seqNoDocValue; public final Field primaryTerm; + public final Field tombstoneField; - public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) { + public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm, Field tombstoneField) { Objects.requireNonNull(seqNo, "sequence number field cannot be null"); Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null"); Objects.requireNonNull(primaryTerm, "primary term field cannot be null"); this.seqNo = seqNo; this.seqNoDocValue = seqNoDocValue; this.primaryTerm = primaryTerm; + this.tombstoneField = tombstoneField; } public static SequenceIDFields emptySeqID() { return new SequenceIDFields(new LongPoint(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO), new NumericDocValuesField(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO), - new NumericDocValuesField(PRIMARY_TERM_NAME, 0)); + new NumericDocValuesField(PRIMARY_TERM_NAME, 0), new NumericDocValuesField(TOMBSTONE_NAME, 0)); } } public static final String NAME = "_seq_no"; public static final String CONTENT_TYPE = "_seq_no"; public static final String PRIMARY_TERM_NAME = "_primary_term"; + public static final String TOMBSTONE_NAME = "_tombstone"; public static class SeqNoDefaults { public static final String NAME = SeqNoFieldMapper.NAME; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 63db5912c37b4..e110bbdf04ed2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -90,12 +90,14 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.get.ShardGetService; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; @@ -2158,8 +2160,7 @@ private EngineConfig newEngineConfig() { IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm, - this::createTombstoneDoc); + indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm, tombstoneDocSupplier()); } /** @@ -2588,7 +2589,18 @@ public void afterRefresh(boolean didRefresh) throws IOException { } } - private ParsedDocument createTombstoneDoc(String type, String id) { - return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id); + private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() { + return new EngineConfig.TombstoneDocSupplier() { + @Override + public ParsedDocument newDeleteTombstoneDoc(String type, String id) { + return docMapper(type).getDocumentMapper().createDeleteTombstoneDoc(shardId.getIndexName(), type, id); + } + @Override + public ParsedDocument newNoopTombstoneDoc() { + final RootObjectMapper.Builder rootMapper = new RootObjectMapper.Builder("__noop"); + final DocumentMapper documentMapper = new DocumentMapper.Builder(rootMapper, mapperService).build(mapperService); + return documentMapper.createNoopTombstoneDoc(shardId.getIndexName()); + } + }; } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java index 407212936d1d6..4af5d5b9a8535 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java @@ -391,6 +391,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception { assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs)); } }, 30, TimeUnit.SECONDS); + internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index a61f2b462f616..09d8d5f76116a 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -103,6 +103,7 @@ import org.elasticsearch.index.mapper.ContentPath; import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.Mapper.BuilderContext; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.MetadataFieldMapper; import org.elasticsearch.index.mapper.ParseContext; @@ -173,6 +174,7 @@ import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -2603,7 +2605,7 @@ public void testRecoverFromForeignTranslog() throws IOException { new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(), - new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc); + new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier()); try { InternalEngine internalEngine = new InternalEngine(brokenConfig); fail("translog belongs to a different engine"); @@ -3046,7 +3048,8 @@ public void testDoubleDeliveryReplica() throws IOException { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); assertEquals(1, topDocs.totalHits); } - assertThat(getOperationSeqNoInLucene(engine), contains(20L)); + List ops = readAllOperationsInLucene(engine, createMapperService("test")); + assertThat(ops.stream().map(o -> o.seqNo()).collect(Collectors.toList()), hasItem(20L)); } public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException { @@ -3606,7 +3609,9 @@ public void testNoOps() throws IOException { maxSeqNo, localCheckpoint); trimUnsafeCommits(engine.config()); - noOpEngine = new InternalEngine(engine.config(), supplier) { + EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, + () -> new MatchAllDocsQuery(), engine.config().getMergePolicy())); + noOpEngine = new InternalEngine(noopEngineConfig, supplier) { @Override protected long doGenerateSeqNoForOperation(Operation operation) { throw new UnsupportedOperationException(); @@ -3636,6 +3641,13 @@ protected long doGenerateSeqNoForOperation(Operation operation) { assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 2))); assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get())); assertThat(noOp.reason(), equalTo(reason)); + MapperService mapperService = createMapperService("test"); + List operationsFromLucene = readAllOperationsInLucene(noOpEngine, mapperService); + assertThat(operationsFromLucene, hasSize(maxSeqNo + 2 - localCheckpoint)); // fills n gap and 2 manual noop. + for (int i = 0; i < operationsFromLucene.size(); i++) { + assertThat(operationsFromLucene.get(i), equalTo(new Translog.NoOp(localCheckpoint + 1 + i, primaryTerm.get(), ""))); + } + assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine, mapperService); } finally { IOUtils.close(noOpEngine); } @@ -4603,7 +4615,10 @@ private void assertOperationHistoryInLucene(List operations) t engine.forceMerge(true); } } - assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray())); + MapperService mapperService = createMapperService("test"); + List actualOps = readAllOperationsInLucene(engine, mapperService); + assertThat(actualOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray())); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java index 2d2aaac7bbd26..31b79e1ba2f16 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java @@ -245,7 +245,11 @@ public void testDocumentFailureReplication() throws Exception { try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { @Override protected EngineFactory getEngineFactory(ShardRouting routing) { - return throwingDocumentFailureEngineFactory; + if (routing.primary()){ + return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary. + }else { + return InternalEngine::new; + } }}) { // test only primary diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 0c394945bcbf3..583d6f3d45c1a 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -77,6 +77,7 @@ import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.engine.InternalEngine; import org.elasticsearch.index.engine.InternalEngineFactory; import org.elasticsearch.index.engine.Segment; @@ -3081,13 +3082,23 @@ public void onShardInactive(IndexShard indexShard) { public void testSupplyTombstoneDoc() throws Exception { IndexShard shard = newStartedShard(); String id = randomRealisticUnicodeOfLengthBetween(1, 10); - ParsedDocument tombstone = shard.getEngine().config().getTombstoneDocSupplier().newTombstoneDoc("doc", id); - assertThat(tombstone.docs(), hasSize(1)); - ParseContext.Document doc = tombstone.docs().get(0); - assertThat(doc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()), + ParsedDocument deleteTombstone = shard.getEngine().config().getTombstoneDocSupplier().newDeleteTombstoneDoc("doc", id); + assertThat(deleteTombstone.docs(), hasSize(1)); + ParseContext.Document deleteDoc = deleteTombstone.docs().get(0); + assertThat(deleteDoc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()), + containsInAnyOrder(IdFieldMapper.NAME, VersionFieldMapper.NAME, + SeqNoFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME)); + assertThat(deleteDoc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id))); + assertThat(deleteDoc.getField(SeqNoFieldMapper.TOMBSTONE_NAME).numericValue().longValue(), equalTo(1L)); + + ParsedDocument noopTombstone = shard.getEngine().config().getTombstoneDocSupplier().newNoopTombstoneDoc(); + assertThat(noopTombstone.docs(), hasSize(1)); + ParseContext.Document noopDoc = noopTombstone.docs().get(0); + assertThat(noopDoc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()), containsInAnyOrder(SeqNoFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, - IdFieldMapper.NAME, VersionFieldMapper.NAME)); - assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id))); + SeqNoFieldMapper.TOMBSTONE_NAME)); + assertThat(noopDoc.getField(SeqNoFieldMapper.TOMBSTONE_NAME).numericValue().longValue(), equalTo(1L)); + closeShards(shard); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 7fbf86eb5738a..e7109979332b4 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -132,7 +132,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, - EngineTestCase::createTombstoneDoc); + EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); engine.recoverFromTranslog(); listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index cf1449fecd6a5..f72c30d9f4621 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -66,6 +66,7 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import org.junit.After; import java.io.IOException; import java.util.ArrayList; @@ -109,6 +110,11 @@ protected Collection> nodePlugins() { RecoverySettingsChunkSizePlugin.class); } + @After + public void assertConsistentHistoryInLuceneIndex() throws Exception { + internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); + } + private void assertRecoveryStateWithoutStage(RecoveryState state, int shardId, RecoverySource recoverySource, boolean primary, String sourceNode, String targetNode) { assertThat(state.getShardId().getId(), equalTo(shardId)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 45110ee6a2d15..70591e06d1084 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -27,6 +27,7 @@ import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; +import org.junit.After; import java.io.IOException; import java.nio.file.FileVisitResult; @@ -58,6 +59,11 @@ protected Collection> nodePlugins() { return Arrays.asList(MockRepository.Plugin.class); } + @After + public void assertConsistentHistoryInLuceneIndex() throws Exception { + internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); + } + public static long getFailureCount(String repository) { long failureCount = 0; for (RepositoriesService repositoriesService : diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 57ebf90efcf12..38f518afa898e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -39,6 +39,8 @@ import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.TermQuery; import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; @@ -46,6 +48,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.Version; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.Nullable; @@ -56,13 +59,17 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.MapperTestUtils; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; +import org.elasticsearch.index.fieldvisitor.FieldsVisitor; import org.elasticsearch.index.mapper.IdFieldMapper; +import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.ParsedDocument; @@ -93,9 +100,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.LongSupplier; @@ -196,8 +203,7 @@ public EngineConfig copy(EngineConfig config, LongSupplier globalCheckpointSuppl new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), - config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(), - EngineTestCase::createTombstoneDoc); + config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(), tombstoneDocSupplier()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -210,15 +216,27 @@ public EngineConfig copy(EngineConfig config, Analyzer analyzer) { config.getTombstoneDocSupplier()); } + public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { + return new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(), + config.getWarmer(), config.getStore(), mergePolicy, config.getAnalyzer(), config.getSimilarity(), + new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), + config.getTranslogConfig(), config.getFlushMergesAfter(), + config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(), + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), + config.getTombstoneDocSupplier()); + } + @Override @After public void tearDown() throws Exception { super.tearDown(); if (engine != null && engine.isClosed.get() == false) { engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, createMapperService("test")); } if (replicaEngine != null && replicaEngine.isClosed.get() == false) { replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs(); + assertConsistentHistoryBetweenTranslogAndLuceneIndex(replicaEngine, createMapperService("test")); } IOUtils.close( replicaEngine, storeReplica, @@ -265,18 +283,37 @@ protected static ParsedDocument testParsedDocument( /** * Creates a tombstone document that only includes uid, seq#, term and version fields. */ - public static ParsedDocument createTombstoneDoc(String type, String id) { - final ParseContext.Document document = new ParseContext.Document(); - Field uidField = new Field(IdFieldMapper.NAME, Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); - document.add(uidField); - Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0); - document.add(versionField); - SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); - document.add(seqID.seqNo); - document.add(seqID.seqNoDocValue); - document.add(seqID.primaryTerm); - return new ParsedDocument(versionField, seqID, id, type, null, Collections.singletonList(document), - new BytesArray("{}"), XContentType.JSON, null); + public static EngineConfig.TombstoneDocSupplier tombstoneDocSupplier(){ + return new EngineConfig.TombstoneDocSupplier() { + @Override + public ParsedDocument newDeleteTombstoneDoc(String type, String id) { + final ParseContext.Document doc = new ParseContext.Document(); + Field uidField = new Field(IdFieldMapper.NAME, Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE); + doc.add(uidField); + Field versionField = new NumericDocValuesField(VersionFieldMapper.NAME, 0); + doc.add(versionField); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + doc.add(seqID.seqNo); + doc.add(seqID.seqNoDocValue); + doc.add(seqID.primaryTerm); + seqID.tombstoneField.setLongValue(1); + doc.add(seqID.tombstoneField); + return new ParsedDocument(versionField, seqID, id, type, null, + Collections.singletonList(doc), new BytesArray("{}"), XContentType.JSON, null); + } + + @Override + public ParsedDocument newNoopTombstoneDoc() { + final ParseContext.Document doc = new ParseContext.Document(); + SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID(); + doc.add(seqID.seqNo); + doc.add(seqID.seqNoDocValue); + doc.add(seqID.primaryTerm); + seqID.tombstoneField.setLongValue(1); + doc.add(seqID.tombstoneField); + return new ParsedDocument(null, seqID, null, null, null, Collections.singletonList(doc), null, XContentType.JSON, null); + } + }; } protected Store createStore() throws IOException { @@ -487,7 +524,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) { new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) : - globalCheckpointSupplier, primaryTerm::get, EngineTestCase::createTombstoneDoc); + globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier()); return config; } @@ -673,30 +710,119 @@ public static void assertOpsOnReplica( } /** - * Returns a list of sequence numbers of all existing documents including soft-deleted documents in Lucene. + * Reads all engine operations that have been processed by the engine from Lucene index. + * The returned operations are sorted and de-duplicated, thus each sequence number will be have at most one operation. */ - public static Set getOperationSeqNoInLucene(Engine engine) throws IOException { + public static List readAllOperationsInLucene(Engine engine, MapperService mapper) throws IOException { engine.refresh("test"); - final Set seqNos = new HashSet<>(); + final List operations = new ArrayList<>(); try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { - IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); - List leaves = indexSearcher.getIndexReader().leaves(); - NumericDocValues[] seqNoDocValues = new NumericDocValues[leaves.size()]; - for (int i = 0; i < leaves.size(); i++) { - seqNoDocValues[i] = leaves.get(i).reader().getNumericDocValues(SeqNoFieldMapper.NAME); - } - TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + final IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + final Sort sortedBySeqNoThenByTerm = new Sort( + new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG), + new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true) + ); + final TopDocs allDocs = indexSearcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE, sortedBySeqNoThenByTerm); + long lastSeenSeqNo = SequenceNumbers.NO_OPS_PERFORMED; for (ScoreDoc scoreDoc : allDocs.scoreDocs) { - int leafIndex = ReaderUtil.subIndex(scoreDoc.doc, leaves); - int segmentDocId = scoreDoc.doc - leaves.get(leafIndex).docBase; - if (seqNoDocValues[leafIndex] != null && seqNoDocValues[leafIndex].advanceExact(segmentDocId)) { - seqNos.add(seqNoDocValues[leafIndex].longValue()); - } else { - throw new AssertionError("Segment without seqno DocValues"); + final Translog.Operation op = readOperationInLucene(indexSearcher, mapper, scoreDoc.doc); + if (op.seqNo() != lastSeenSeqNo) { + operations.add(op); + lastSeenSeqNo = op.seqNo(); } } } - return seqNos; + return operations; + } + + private static Translog.Operation readOperationInLucene(IndexSearcher searcher, MapperService mapper, int docID) throws IOException { + final List leaves = searcher.getIndexReader().leaves(); + final int leafIndex = ReaderUtil.subIndex(docID, leaves); + final int segmentDocID = docID - leaves.get(leafIndex).docBase; + final long seqNo = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.NAME, segmentDocID); + final long primaryTerm = readNumericDV(leaves.get(leafIndex), SeqNoFieldMapper.PRIMARY_TERM_NAME, segmentDocID); + final FieldsVisitor fields = new FieldsVisitor(true); + searcher.doc(docID, fields); + fields.postProcess(mapper); + final Translog.Operation op; + final boolean isTombstone = isTombstoneOperation(leaves.get(leafIndex), segmentDocID); + if (isTombstone && fields.uid() == null) { + op = new Translog.NoOp(seqNo, primaryTerm, ""); + assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1 + : "Noop operation but soft_deletes field is not set"; + } else { + final String id = fields.uid().id(); + final String type = fields.uid().type(); + final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id)); + final long version = readNumericDV(leaves.get(leafIndex), VersionFieldMapper.NAME, segmentDocID); + if (isTombstone) { + op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL); + assert readNumericDV(leaves.get(leafIndex), Lucene.SOFT_DELETE_FIELD, segmentDocID) == 1 + : "Delete operation but soft_deletes field is not set"; + } else { + final BytesReference source = fields.source(); + op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL, source.toBytesRef().bytes, + fields.routing(), -1); + } + } + return op; + } + + private static boolean isTombstoneOperation(LeafReaderContext leaf, int segmentDocID) throws IOException { + final NumericDocValues tombstoneDV = leaf.reader().getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME); + if (tombstoneDV != null && tombstoneDV.advanceExact(segmentDocID)) { + return tombstoneDV.longValue() == 1; + } + return false; + } + + private static long readNumericDV(LeafReaderContext leaf, String field, int segmentDocID) throws IOException { + final NumericDocValues dv = leaf.reader().getNumericDocValues(field); + if (dv == null || dv.advanceExact(segmentDocID) == false) { + throw new IllegalStateException("DocValues for field [" + field + "] is not found"); + } + return dv.longValue(); + } + + /** + * Asserts the provided engine has a consistent document history between translog and Lucene index. + */ + public static void assertConsistentHistoryBetweenTranslogAndLuceneIndex(Engine engine, MapperService mapper) throws IOException { + if (mapper.documentMapper() == null || engine.config().getIndexSettings().isSoftDeleteEnabled() == false) { + return; + } + final Map translogOps = new HashMap<>(); + try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) { + Translog.Operation op; + while ((op = snapshot.next()) != null) { + translogOps.put(op.seqNo(), op); + } + } + final List luceneOps = readAllOperationsInLucene(engine, mapper); + for (Translog.Operation luceneOp : luceneOps) { + Translog.Operation translogOp = translogOps.get(luceneOp.seqNo()); + if (translogOp == null) { + continue; + } + assertThat(luceneOp.primaryTerm(), equalTo(translogOp.primaryTerm())); + assertThat(luceneOp.opType(), equalTo(translogOp.opType())); + if (luceneOp.opType() == Translog.Operation.Type.INDEX) { + assertThat(luceneOp.getSource().source, equalTo(translogOp.getSource().source)); + } + } + } + + protected MapperService createMapperService(String type) throws IOException { + IndexMetaData indexMetaData = IndexMetaData.builder("test") + .settings(Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)) + .putMapping(type, "{\"properties\": {}}") + .build(); + MapperService mapperService = MapperTestUtils.newMapperService(new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), + createTempDir(), Settings.EMPTY, "test"); + mapperService.merge(indexMetaData, MapperService.MergeReason.MAPPING_UPDATE); + return mapperService; } /** diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 1f15964ed76b0..9524ed8fabadc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -372,6 +372,7 @@ protected void closeShards(Iterable shards) throws IOException { for (IndexShard shard : shards) { if (shard != null) { try { + assertConsistentHistoryBetweenTranslogAndLucene(shard); shard.close("test", false); } finally { IOUtils.close(shard.store()); @@ -546,6 +547,12 @@ protected void assertDocs(IndexShard shard, String... ids) throws IOException { assertThat(shardDocUIDs, hasSize(ids.length)); } + public static void assertConsistentHistoryBetweenTranslogAndLucene(IndexShard shard) throws IOException { + final Engine engine = shard.getEngineOrNull(); + if (engine != null) { + EngineTestCase.assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, shard.mapperService()); + } + } protected Engine.IndexResult indexDoc(IndexShard shard, String type, String id) throws IOException { return indexDoc(shard, type, id, "{}"); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 12acd21903ec4..0c1aa41722f36 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -75,6 +75,7 @@ import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.CommitStats; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.EngineTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; @@ -1169,6 +1170,22 @@ private void assertOpenTranslogReferences() throws Exception { }); } + /** + * Asserts that the document history in Lucene index is consistent with Translog's on every index shard of the cluster. + * This assertion might be expensive, thus we prefer not to execute on every test but only interesting tests. + */ + public void assertConsistentHistoryBetweenTranslogAndLuceneIndex() throws IOException { + final Collection nodesAndClients = nodes.values(); + for (NodeAndClient nodeAndClient : nodesAndClients) { + IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); + for (IndexService indexService : indexServices) { + for (IndexShard indexShard : indexService) { + IndexShardTestCase.assertConsistentHistoryBetweenTranslogAndLucene(indexShard); + } + } + } + } + private void randomlyResetClients() throws IOException { // only reset the clients on nightly tests, it causes heavy load... if (RandomizedTest.isNightly() && rarely(random)) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 56ee10a98d1f5..26108dcd9f7c1 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -282,7 +282,7 @@ public void onFailedEngine(String reason, Exception e) { new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm.get(), - EngineTestCase::createTombstoneDoc + EngineTestCase.tombstoneDocSupplier() ); }