Skip to content

Commit

Permalink
Add tombstone document into Lucene for Noop
Browse files Browse the repository at this point in the history
This commit adds a tombstone document into Lucene for every No-op. With
this change, Lucene index is expected to have a complete history of
operations like Translog. In fact, this guarantee is subjected to the
soft-deletes retention merge policy.
  • Loading branch information
dnhatn committed Apr 28, 2018
1 parent 8ebca76 commit 7bfd4d1
Show file tree
Hide file tree
Showing 17 changed files with 312 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,16 @@ public LongSupplier getPrimaryTermSupplier() {
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
*/
@FunctionalInterface
public interface TombstoneDocSupplier {
ParsedDocument newTombstoneDoc(String type, String id);
/**
* Creates a tombstone document for a delete operation.
*/
ParsedDocument newDeleteTombstoneDoc(String type, String id);

/**
* Creates a tombstone document for a noop operation.
*/
ParsedDocument newNoopTombstoneDoc();
}

public TombstoneDocSupplier getTombstoneDocSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,9 @@ public IndexResult index(Index index) throws IOException {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
index.startTime(), indexResult.getFailure().getMessage());
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
Expand Down Expand Up @@ -1226,7 +1228,7 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
throws IOException {
try {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
Expand Down Expand Up @@ -1334,7 +1336,21 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc();
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
addStaleDocs(tombstone.docs(), indexWriter);
} catch (Exception ex) {
if (indexWriter.getTragicException() != null) {
throw ex;
}
failure = ex;
}
}
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo(), failure);
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ public DocumentMapper build(MapperService mapperService) {
private final Map<String, ObjectMapper> objectMappers;

private final boolean hasNestedObjects;
private final MetadataFieldMapper[] tombstoneMetadataFieldMappers;

public DocumentMapper(MapperService mapperService, Mapping mapping) {
this.mapperService = mapperService;
Expand All @@ -133,10 +132,6 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
final IndexSettings indexSettings = mapperService.getIndexSettings();
this.mapping = mapping;
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);
final Collection<String> tombstoneFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME);
this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);

// collect all the mappers for this type
List<ObjectMapper> newObjectMappers = new ArrayList<>();
Expand Down Expand Up @@ -251,9 +246,23 @@ public ParsedDocument parse(SourceToParse source) throws MapperParsingException
return documentParser.parseDocument(source, mapping.metadataMappers);
}

public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException {
public ParsedDocument createDeleteTombstoneDoc(String index, String type, String id) throws MapperParsingException {
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers);
final Collection<String> deleteFields = Arrays.asList(VersionFieldMapper.NAME, IdFieldMapper.NAME, TypeFieldMapper.NAME,
SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
final MetadataFieldMapper[] deleteFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> deleteFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
return documentParser.parseDocument(emptySource, deleteFieldMappers).toTombstone();
}

public ParsedDocument createNoopTombstoneDoc(String index) throws MapperParsingException {
final String id = ""; // _id won't be used.
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
final Collection<String> noopFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
final MetadataFieldMapper[] noopFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> noopFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
return documentParser.parseDocument(emptySource, noopFieldMappers).toTombstone();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ public void updateSeqID(long sequenceNumber, long primaryTerm) {
this.seqID.primaryTerm.setLongValue(primaryTerm);
}

ParsedDocument toTombstone() {
assert docs().size() == 1 : "Tombstone should have a single doc [" + docs() + "]";
this.seqID.tombstoneField.setLongValue(1);
rootDoc().add(this.seqID.tombstoneField);
return this;
}

public String routing() {
return this.routing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,29 @@ public static class SequenceIDFields {
public final Field seqNo;
public final Field seqNoDocValue;
public final Field primaryTerm;
public final Field tombstoneField;

public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) {
public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm, Field tombstoneField) {
Objects.requireNonNull(seqNo, "sequence number field cannot be null");
Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null");
Objects.requireNonNull(primaryTerm, "primary term field cannot be null");
this.seqNo = seqNo;
this.seqNoDocValue = seqNoDocValue;
this.primaryTerm = primaryTerm;
this.tombstoneField = tombstoneField;
}

public static SequenceIDFields emptySeqID() {
return new SequenceIDFields(new LongPoint(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(PRIMARY_TERM_NAME, 0));
new NumericDocValuesField(PRIMARY_TERM_NAME, 0), new NumericDocValuesField(TOMBSTONE_NAME, 0));
}
}

public static final String NAME = "_seq_no";
public static final String CONTENT_TYPE = "_seq_no";
public static final String PRIMARY_TERM_NAME = "_primary_term";
public static final String TOMBSTONE_NAME = "_tombstone";

public static class SeqNoDefaults {
public static final String NAME = SeqNoFieldMapper.NAME;
Expand Down
20 changes: 16 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
Expand Down Expand Up @@ -2158,8 +2160,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm,
this::createTombstoneDoc);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm, tombstoneDocSupplier());
}

/**
Expand Down Expand Up @@ -2588,7 +2589,18 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}

private ParsedDocument createTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id);
private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
return new EngineConfig.TombstoneDocSupplier() {
@Override
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createDeleteTombstoneDoc(shardId.getIndexName(), type, id);
}
@Override
public ParsedDocument newNoopTombstoneDoc() {
final RootObjectMapper.Builder rootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper documentMapper = new DocumentMapper.Builder(rootMapper, mapperService).build(mapperService);
return documentMapper.createNoopTombstoneDoc(shardId.getIndexName());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs));
}
}, 30, TimeUnit.SECONDS);
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -173,6 +174,7 @@
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -2603,7 +2605,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc);
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine");
Expand Down Expand Up @@ -3046,7 +3048,8 @@ public void testDoubleDeliveryReplica() throws IOException {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
assertThat(getOperationSeqNoInLucene(engine), contains(20L));
List<Translog.Operation> ops = readAllOperationsInLucene(engine, createMapperService("test"));
assertThat(ops.stream().map(o -> o.seqNo()).collect(Collectors.toList()), hasItem(20L));
}

public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
Expand Down Expand Up @@ -3606,7 +3609,9 @@ public void testNoOps() throws IOException {
maxSeqNo,
localCheckpoint);
trimUnsafeCommits(engine.config());
noOpEngine = new InternalEngine(engine.config(), supplier) {
EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD,
() -> new MatchAllDocsQuery(), engine.config().getMergePolicy()));
noOpEngine = new InternalEngine(noopEngineConfig, supplier) {
@Override
protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -3636,6 +3641,13 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 2)));
assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get()));
assertThat(noOp.reason(), equalTo(reason));
MapperService mapperService = createMapperService("test");
List<Translog.Operation> operationsFromLucene = readAllOperationsInLucene(noOpEngine, mapperService);
assertThat(operationsFromLucene, hasSize(maxSeqNo + 2 - localCheckpoint)); // fills n gap and 2 manual noop.
for (int i = 0; i < operationsFromLucene.size(); i++) {
assertThat(operationsFromLucene.get(i), equalTo(new Translog.NoOp(localCheckpoint + 1 + i, primaryTerm.get(), "")));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine, mapperService);
} finally {
IOUtils.close(noOpEngine);
}
Expand Down Expand Up @@ -4603,7 +4615,10 @@ private void assertOperationHistoryInLucene(List<Engine.Operation> operations) t
engine.forceMerge(true);
}
}
assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray()));
MapperService mapperService = createMapperService("test");
List<Translog.Operation> actualOps = readAllOperationsInLucene(engine, mapperService);
assertThat(actualOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ public void testDocumentFailureReplication() throws Exception {
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return throwingDocumentFailureEngineFactory;
if (routing.primary()){
return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary.
}else {
return InternalEngine::new;
}
}}) {

// test only primary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineTestCase;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.engine.Segment;
Expand Down Expand Up @@ -3081,13 +3082,23 @@ public void onShardInactive(IndexShard indexShard) {
public void testSupplyTombstoneDoc() throws Exception {
IndexShard shard = newStartedShard();
String id = randomRealisticUnicodeOfLengthBetween(1, 10);
ParsedDocument tombstone = shard.getEngine().config().getTombstoneDocSupplier().newTombstoneDoc("doc", id);
assertThat(tombstone.docs(), hasSize(1));
ParseContext.Document doc = tombstone.docs().get(0);
assertThat(doc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()),
ParsedDocument deleteTombstone = shard.getEngine().config().getTombstoneDocSupplier().newDeleteTombstoneDoc("doc", id);
assertThat(deleteTombstone.docs(), hasSize(1));
ParseContext.Document deleteDoc = deleteTombstone.docs().get(0);
assertThat(deleteDoc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()),
containsInAnyOrder(IdFieldMapper.NAME, VersionFieldMapper.NAME,
SeqNoFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME));
assertThat(deleteDoc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id)));
assertThat(deleteDoc.getField(SeqNoFieldMapper.TOMBSTONE_NAME).numericValue().longValue(), equalTo(1L));

ParsedDocument noopTombstone = shard.getEngine().config().getTombstoneDocSupplier().newNoopTombstoneDoc();
assertThat(noopTombstone.docs(), hasSize(1));
ParseContext.Document noopDoc = noopTombstone.docs().get(0);
assertThat(noopDoc.getFields().stream().map(IndexableField::name).collect(Collectors.toList()),
containsInAnyOrder(SeqNoFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME,
IdFieldMapper.NAME, VersionFieldMapper.NAME));
assertThat(doc.getField(IdFieldMapper.NAME).binaryValue(), equalTo(Uid.encodeId(id)));
SeqNoFieldMapper.TOMBSTONE_NAME));
assertThat(noopDoc.getField(SeqNoFieldMapper.TOMBSTONE_NAME).numericValue().longValue(), equalTo(1L));

closeShards(shard);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilari
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
(e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm,
EngineTestCase::createTombstoneDoc);
EngineTestCase.tombstoneDocSupplier());
engine = new InternalEngine(config);
engine.recoverFromTranslog();
listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
Expand Down
Loading

0 comments on commit 7bfd4d1

Please sign in to comment.