Skip to content

Commit

Permalink
Add tombstone document into Lucene for Noop (#30226)
Browse files Browse the repository at this point in the history
This commit adds a tombstone document into Lucene for every No-op. 
With this change, Lucene index is expected to have a complete history 
of operations like Translog. In fact, this guarantee is subjected to the
soft-deletes retention merge policy.

Relates #29530
  • Loading branch information
dnhatn authored May 2, 2018
1 parent 217d090 commit d621fc7
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,16 @@ public LongSupplier getPrimaryTermSupplier() {
* A supplier supplies tombstone documents which will be used in soft-update methods.
* The returned document consists only _uid, _seqno, _term and _version fields; other metadata fields are excluded.
*/
@FunctionalInterface
public interface TombstoneDocSupplier {
ParsedDocument newTombstoneDoc(String type, String id);
/**
* Creates a tombstone document for a delete operation.
*/
ParsedDocument newDeleteTombstoneDoc(String type, String id);

/**
* Creates a tombstone document for a noop operation.
*/
ParsedDocument newNoopTombstoneDoc();
}

public TombstoneDocSupplier getTombstoneDocSupplier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -784,7 +785,9 @@ public IndexResult index(Index index) throws IOException {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
index.startTime(), indexResult.getFailure().getMessage());
location = innerNoOp(noOp).getTranslogLocation();
} else {
location = null;
}
Expand Down Expand Up @@ -1226,11 +1229,13 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
throws IOException {
try {
if (softDeleteEnabled) {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newTombstoneDoc(delete.type(), delete.id());
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
assert tombstone.docs().size() == 1 : "Tombstone doc should have single doc [" + tombstone + "]";
tombstone.updateSeqID(plan.seqNoOfDeletion, delete.primaryTerm());
tombstone.version().setLongValue(plan.versionOfDeletion);
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null :
"Delete tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeleteField);
if (plan.addStaleOpToLucene || plan.currentlyDeleted) {
indexWriter.addDocument(doc);
Expand Down Expand Up @@ -1334,7 +1339,25 @@ private NoOpResult innerNoOp(final NoOp noOp) throws IOException {
assert noOp.seqNo() > SequenceNumbers.NO_OPS_PERFORMED;
final long seqNo = noOp.seqNo();
try {
final NoOpResult noOpResult = new NoOpResult(noOp.seqNo());
Exception failure = null;
if (softDeleteEnabled) {
try {
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newNoopTombstoneDoc();
tombstone.updateSeqID(noOp.seqNo(), noOp.primaryTerm());
assert tombstone.docs().size() == 1 : "Tombstone should have a single doc [" + tombstone + "]";
final ParseContext.Document doc = tombstone.docs().get(0);
assert doc.getField(SeqNoFieldMapper.TOMBSTONE_NAME) != null
: "Noop tombstone document but _tombstone field is not set [" + doc + " ]";
doc.add(softDeleteField);
indexWriter.addDocument(doc);
} catch (Exception ex) {
if (maybeFailEngine("noop", ex)) {
throw ex;
}
failure = ex;
}
}
final NoOpResult noOpResult = failure != null ? new NoOpResult(noOp.seqNo(), failure) : new NoOpResult(noOp.seqNo());
if (noOp.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location = translog.add(new Translog.NoOp(noOp.seqNo(), noOp.primaryTerm(), noOp.reason()));
noOpResult.setTranslogLocation(location);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public DocumentMapper build(MapperService mapperService) {
private final Map<String, ObjectMapper> objectMappers;

private final boolean hasNestedObjects;
private final MetadataFieldMapper[] tombstoneMetadataFieldMappers;
private final MetadataFieldMapper[] deleteTombstoneMetadataFieldMappers;
private final MetadataFieldMapper[] noopTombstoneMetadataFieldMappers;

public DocumentMapper(MapperService mapperService, Mapping mapping) {
this.mapperService = mapperService;
Expand All @@ -133,10 +134,6 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
final IndexSettings indexSettings = mapperService.getIndexSettings();
this.mapping = mapping;
this.documentParser = new DocumentParser(indexSettings, mapperService.documentMapperParser(), this);
final Collection<String> tombstoneFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, VersionFieldMapper.NAME, IdFieldMapper.NAME);
this.tombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> tombstoneFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);

// collect all the mappers for this type
List<ObjectMapper> newObjectMappers = new ArrayList<>();
Expand Down Expand Up @@ -176,6 +173,15 @@ public DocumentMapper(MapperService mapperService, Mapping mapping) {
} catch (Exception e) {
throw new ElasticsearchGenerationException("failed to serialize source for type [" + type + "]", e);
}

final Collection<String> deleteTombstoneMetadataFields = Arrays.asList(VersionFieldMapper.NAME, IdFieldMapper.NAME,
TypeFieldMapper.NAME, SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
this.deleteTombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> deleteTombstoneMetadataFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
final Collection<String> noopTombstoneMetadataFields =
Arrays.asList(SeqNoFieldMapper.NAME, SeqNoFieldMapper.PRIMARY_TERM_NAME, SeqNoFieldMapper.TOMBSTONE_NAME);
this.noopTombstoneMetadataFieldMappers = Stream.of(mapping.metadataMappers)
.filter(field -> noopTombstoneMetadataFields.contains(field.name())).toArray(MetadataFieldMapper[]::new);
}

public Mapping mapping() {
Expand Down Expand Up @@ -251,9 +257,15 @@ public ParsedDocument parse(SourceToParse source) throws MapperParsingException
return documentParser.parseDocument(source, mapping.metadataMappers);
}

public ParsedDocument createTombstoneDoc(String index, String type, String id) throws MapperParsingException {
public ParsedDocument createDeleteTombstoneDoc(String index, String type, String id) throws MapperParsingException {
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, deleteTombstoneMetadataFieldMappers).toTombstone();
}

public ParsedDocument createNoopTombstoneDoc(String index) throws MapperParsingException {
final String id = ""; // _id won't be used.
final SourceToParse emptySource = SourceToParse.source(index, type, id, new BytesArray("{}"), XContentType.JSON);
return documentParser.parseDocument(emptySource, tombstoneMetadataFieldMappers);
return documentParser.parseDocument(emptySource, noopTombstoneMetadataFieldMappers).toTombstone();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public void updateSeqID(long sequenceNumber, long primaryTerm) {
this.seqID.primaryTerm.setLongValue(primaryTerm);
}

/**
* Makes the processing document as a tombstone document rather than a regular document.
* Tombstone documents are stored in Lucene index to represent delete operations or Noops.
*/
ParsedDocument toTombstone() {
assert docs().size() == 1 : "Tombstone should have a single doc [" + docs() + "]";
this.seqID.tombstoneField.setLongValue(1);
rootDoc().add(this.seqID.tombstoneField);
return this;
}

public String routing() {
return this.routing;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,29 @@ public static class SequenceIDFields {
public final Field seqNo;
public final Field seqNoDocValue;
public final Field primaryTerm;
public final Field tombstoneField;

public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm) {
public SequenceIDFields(Field seqNo, Field seqNoDocValue, Field primaryTerm, Field tombstoneField) {
Objects.requireNonNull(seqNo, "sequence number field cannot be null");
Objects.requireNonNull(seqNoDocValue, "sequence number dv field cannot be null");
Objects.requireNonNull(primaryTerm, "primary term field cannot be null");
this.seqNo = seqNo;
this.seqNoDocValue = seqNoDocValue;
this.primaryTerm = primaryTerm;
this.tombstoneField = tombstoneField;
}

public static SequenceIDFields emptySeqID() {
return new SequenceIDFields(new LongPoint(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(NAME, SequenceNumbers.UNASSIGNED_SEQ_NO),
new NumericDocValuesField(PRIMARY_TERM_NAME, 0));
new NumericDocValuesField(PRIMARY_TERM_NAME, 0), new NumericDocValuesField(TOMBSTONE_NAME, 0));
}
}

public static final String NAME = "_seq_no";
public static final String CONTENT_TYPE = "_seq_no";
public static final String PRIMARY_TERM_NAME = "_primary_term";
public static final String TOMBSTONE_NAME = "_tombstone";

public static class SeqNoDefaults {
public static final String NAME = SeqNoFieldMapper.NAME;
Expand Down
20 changes: 16 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,14 @@
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.RootObjectMapper;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
Expand Down Expand Up @@ -2162,8 +2164,7 @@ private EngineConfig newEngineConfig() {
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm,
this::createTombstoneDoc);
indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, this::getPrimaryTerm, tombstoneDocSupplier());
}

/**
Expand Down Expand Up @@ -2592,7 +2593,18 @@ public void afterRefresh(boolean didRefresh) throws IOException {
}
}

private ParsedDocument createTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createTombstoneDoc(shardId.getIndexName(), type, id);
private EngineConfig.TombstoneDocSupplier tombstoneDocSupplier() {
final RootObjectMapper.Builder noopRootMapper = new RootObjectMapper.Builder("__noop");
final DocumentMapper noopDocumentMapper = new DocumentMapper.Builder(noopRootMapper, mapperService).build(mapperService);
return new EngineConfig.TombstoneDocSupplier() {
@Override
public ParsedDocument newDeleteTombstoneDoc(String type, String id) {
return docMapper(type).getDocumentMapper().createDeleteTombstoneDoc(shardId.getIndexName(), type, id);
}
@Override
public ParsedDocument newNoopTombstoneDoc() {
return noopDocumentMapper.createNoopTombstoneDoc(shardId.getIndexName());
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ public void testPrimaryReplicaResyncFailed() throws Exception {
assertThat(shard.getLocalCheckpoint(), equalTo(numDocs + moreDocs));
}
}, 30, TimeUnit.SECONDS);
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.elasticsearch.index.mapper.ContentPath;
import org.elasticsearch.index.mapper.IdFieldMapper;
import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.MetadataFieldMapper;
import org.elasticsearch.index.mapper.ParseContext;
Expand Down Expand Up @@ -173,6 +174,7 @@
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -2603,7 +2605,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, EngineTestCase::createTombstoneDoc);
new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
try {
InternalEngine internalEngine = new InternalEngine(brokenConfig);
fail("translog belongs to a different engine");
Expand Down Expand Up @@ -3046,7 +3048,8 @@ public void testDoubleDeliveryReplica() throws IOException {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
assertEquals(1, topDocs.totalHits);
}
assertThat(getOperationSeqNoInLucene(engine), contains(20L));
List<Translog.Operation> ops = readAllOperationsInLucene(engine, createMapperService("test"));
assertThat(ops.stream().map(o -> o.seqNo()).collect(Collectors.toList()), hasItem(20L));
}

public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
Expand Down Expand Up @@ -3606,7 +3609,9 @@ public void testNoOps() throws IOException {
maxSeqNo,
localCheckpoint);
trimUnsafeCommits(engine.config());
noOpEngine = new InternalEngine(engine.config(), supplier) {
EngineConfig noopEngineConfig = copy(engine.config(), new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD,
() -> new MatchAllDocsQuery(), engine.config().getMergePolicy()));
noOpEngine = new InternalEngine(noopEngineConfig, supplier) {
@Override
protected long doGenerateSeqNoForOperation(Operation operation) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -3636,6 +3641,13 @@ protected long doGenerateSeqNoForOperation(Operation operation) {
assertThat(noOp.seqNo(), equalTo((long) (maxSeqNo + 2)));
assertThat(noOp.primaryTerm(), equalTo(primaryTerm.get()));
assertThat(noOp.reason(), equalTo(reason));
MapperService mapperService = createMapperService("test");
List<Translog.Operation> operationsFromLucene = readAllOperationsInLucene(noOpEngine, mapperService);
assertThat(operationsFromLucene, hasSize(maxSeqNo + 2 - localCheckpoint)); // fills n gap and 2 manual noop.
for (int i = 0; i < operationsFromLucene.size(); i++) {
assertThat(operationsFromLucene.get(i), equalTo(new Translog.NoOp(localCheckpoint + 1 + i, primaryTerm.get(), "")));
}
assertConsistentHistoryBetweenTranslogAndLuceneIndex(noOpEngine, mapperService);
} finally {
IOUtils.close(noOpEngine);
}
Expand Down Expand Up @@ -4603,7 +4615,10 @@ private void assertOperationHistoryInLucene(List<Engine.Operation> operations) t
engine.forceMerge(true);
}
}
assertThat(getOperationSeqNoInLucene(engine), containsInAnyOrder(expectedSeqNos.toArray()));
MapperService mapperService = createMapperService("test");
List<Translog.Operation> actualOps = readAllOperationsInLucene(engine, mapperService);
assertThat(actualOps.stream().map(o -> o.seqNo()).collect(Collectors.toList()), containsInAnyOrder(expectedSeqNos.toArray()));
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ public void testDocumentFailureReplication() throws Exception {
try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) {
@Override
protected EngineFactory getEngineFactory(ShardRouting routing) {
return throwingDocumentFailureEngineFactory;
if (routing.primary()){
return throwingDocumentFailureEngineFactory; // Simulate exception only on the primary.
}else {
return InternalEngine::new;
}
}}) {

// test only primary
Expand Down
Loading

0 comments on commit d621fc7

Please sign in to comment.