Skip to content

Commit

Permalink
Introduce soft-deletes retention policy based on global checkpoint (#…
Browse files Browse the repository at this point in the history
…30335)

This commit introduces a soft-deletes retention merge policy based on
the global checkpoint. Some notes on this simple retention policy:

- This policy keeps all operations whose seq# is greater than the 
persisted global checkpoint and configurable extra operations prior to
the global checkpoint. This is good enough for querying history changes.

- This policy is not watertight for peer-recovery. We send the 
safe-commit in peer-recovery, thus we need to also send all operations
after the local checkpoint of that commit. This is analog to the min
translog generation for recovery.

- This policy is too simple to support rollback.

Relates #29530
  • Loading branch information
dnhatn authored May 5, 2018
1 parent 6e0d0fe commit 2c73969
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_SETTING,
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING,
IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING,
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING,
EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING,
Expand Down
22 changes: 22 additions & 0 deletions server/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,14 @@ public final class IndexSettings {
*/
public static final Setting<Boolean> INDEX_SOFT_DELETES_SETTING = Setting.boolSetting("index.soft_deletes", true, Property.IndexScope);

/**
* Controls how many soft-deleted documents will be kept around before being merged away. Keeping more deleted
* documents increases the chance of operation-based recoveries and allows querying a longer history of documents.
* If soft-deletes is enabled, an engine by default will retain all operations up to the global checkpoint.
**/
public static final Setting<Long> INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING =
Setting.longSetting("index.soft_deletes.retention.operations", 0, 0, Property.IndexScope, Property.Dynamic);

/**
* The maximum number of refresh listeners allows on this shard.
*/
Expand Down Expand Up @@ -287,6 +295,7 @@ public final class IndexSettings {
private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
private final boolean softDeleteEnabled;
private volatile long softDeleteRetentionOperations;
private volatile boolean warmerEnabled;
private volatile int maxResultWindow;
private volatile int maxInnerResultWindow;
Expand Down Expand Up @@ -398,6 +407,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
softDeleteEnabled = version.onOrAfter(Version.V_7_0_0_alpha1) && scopedSettings.get(INDEX_SOFT_DELETES_SETTING);
softDeleteRetentionOperations = scopedSettings.get(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING);
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING);
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING);
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING);
Expand Down Expand Up @@ -455,6 +465,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; }
Expand Down Expand Up @@ -837,4 +848,15 @@ public boolean isExplicitRefresh() {
public boolean isSoftDeleteEnabled() {
return softDeleteEnabled;
}

private void setSoftDeleteRetentionOperations(long ops) {
this.softDeleteRetentionOperations = ops;
}

/**
* Returns the number of extra operations (i.e. soft-deleted documents) to be kept for recoveries and history purpose.
*/
public long getSoftDeleteRetentionOperations() {
return this.softDeleteRetentionOperations;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
Expand All @@ -34,8 +35,10 @@
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
Expand Down Expand Up @@ -2002,8 +2005,8 @@ private IndexWriterConfig getIndexWriterConfig() {
// background merges
MergePolicy mergePolicy = config().getMergePolicy();
if (softDeleteEnabled) {
// TODO: soft-delete retention policy
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD);
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, this::softDeletesRetentionQuery, mergePolicy);
}
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
Expand All @@ -2016,6 +2019,20 @@ private IndexWriterConfig getIndexWriterConfig() {
return iwc;
}

/**
* Documents including tombstones are soft-deleted and matched this query will be retained and won't cleaned up by merges.
*/
private Query softDeletesRetentionQuery() {
ensureOpen();
// TODO: We send the safe commit in peer-recovery, thus we need to retain all operations after the local checkpoint of that commit.
final long retainedExtraOps = engineConfig.getIndexSettings().getSoftDeleteRetentionOperations();
// Prefer using the global checkpoint which is persisted on disk than an in-memory value.
// If we failed to fsync checkpoint but already used a higher global checkpoint value to clean up soft-deleted ops,
// then we may not have all required operations whose seq# greater than the global checkpoint after restarted.
final long persistedGlobalCheckpoint = translog.getLastSyncedGlobalCheckpoint();
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, persistedGlobalCheckpoint + 1 - retainedExtraOps, Long.MAX_VALUE);
}

/** Extended SearcherFactory that warms the segments if needed when acquiring a new searcher */
static final class SearchFactory extends EngineSearcherFactory {
private final Engine.Warmer warmer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.IndexSettingsModule;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;

Expand Down Expand Up @@ -178,6 +179,7 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -251,8 +253,9 @@ public void testVersionMapAfterAutoIDDocument() throws IOException {
}

public void testSegments() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
try (Store store = createStore();
InternalEngine engine = createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {
InternalEngine engine = createEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get))) {
List<Segment> segments = engine.segments(false);
assertThat(segments.isEmpty(), equalTo(true));
assertThat(engine.segmentsStats(false).getCount(), equalTo(0L));
Expand Down Expand Up @@ -324,6 +327,8 @@ public void testSegments() throws Exception {


engine.delete(new Engine.Delete("test", "1", newUid(doc), primaryTerm.get()));
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
engine.getTranslog().sync();
engine.refresh("test");

segments = engine.segments(false);
Expand Down Expand Up @@ -1279,9 +1284,13 @@ public void testVersioningNewIndex() throws IOException {
assertThat(indexResult.getVersion(), equalTo(1L));
}

public void testForceMerge() throws IOException {
public void testForceMergeWithoutSoftDeletes() throws IOException {
Settings settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false).build();
IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
try (Store store = createStore();
Engine engine = createEngine(config(defaultSettings, store, createTempDir(),
Engine engine = createEngine(config(IndexSettingsModule.newIndexSettings(indexMetaData), store, createTempDir(),
new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
Expand Down Expand Up @@ -1322,6 +1331,66 @@ public void testForceMerge() throws IOException {
}
}

public void testForceMergeWithSoftDeletesRetention() throws Exception {
final long retainedExtraOps = randomLongBetween(0, 10);
Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), retainedExtraOps);
final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData);
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final MapperService mapperService = createMapperService("test");
final Set<String> liveDocs = new HashSet<>();
try (Store store = createStore();
Engine engine = createEngine(config(indexSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get))) {
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.type(), doc.id(), newUid(doc.id()), primaryTerm.get()));
liveDocs.remove(doc.id());
}
if (randomBoolean()) {
engine.index(indexForDoc(doc));
liveDocs.add(doc.id());
}
}
long localCheckpoint = engine.getLocalCheckpointTracker().getCheckpoint();
globalCheckpoint.set(randomLongBetween(0, localCheckpoint));
engine.getTranslog().sync();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
Map<Long, Translog.Operation> ops = readAllOperationsInLucene(engine, mapperService)
.stream().collect(Collectors.toMap(Translog.Operation::seqNo, Function.identity()));
for (long seqno = 0; seqno <= localCheckpoint; seqno++) {
long keptIndex = globalCheckpoint.get() + 1 - retainedExtraOps;
String msg = "seq# [" + seqno + "], global checkpoint [" + globalCheckpoint + "], retained-ops [" + retainedExtraOps + "]";
if (seqno < keptIndex) {
Translog.Operation op = ops.get(seqno);
if (op != null) {
assertThat(op, instanceOf(Translog.Index.class));
assertThat(msg, ((Translog.Index) op).id(), isIn(liveDocs));
}
} else {
assertThat(msg, ops.get(seqno), notNullValue());
}
}
settings.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 0);
indexSettings.updateIndexMetaData(IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build());
globalCheckpoint.set(localCheckpoint);
engine.getTranslog().sync();
engine.forceMerge(true, 1, false, false, false);
assertConsistentHistoryBetweenTranslogAndLuceneIndex(engine, mapperService);
assertThat(readAllOperationsInLucene(engine, mapperService), hasSize(liveDocs.size()));
}
}

public void testForceMergeAndClose() throws IOException, InterruptedException {
int numIters = randomIntBetween(2, 10);
for (int j = 0; j < numIters; j++) {
Expand Down Expand Up @@ -2525,14 +2594,16 @@ public void testSkipTranslogReplay() throws IOException {
Engine.IndexResult indexResult = engine.index(firstIndexRequest);
assertThat(indexResult.getVersion(), equalTo(1L));
}
EngineConfig config = engine.config();
assertVisibleCount(engine, numDocs);
engine.close();
trimUnsafeCommits(engine.config());
engine = new InternalEngine(engine.config());
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits, equalTo(0L));
trimUnsafeCommits(config);
try (InternalEngine engine = new InternalEngine(config)) {
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits, equalTo(0L));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2350,7 +2350,16 @@ public void testDocStats() throws IOException {
deleteDoc(indexShard, "_doc", id);
indexDoc(indexShard, "_doc", id);
}

// Need to update and sync the global checkpoint as the soft-deletes retention MergePolicy depends on it.
if (indexShard.indexSettings.isSoftDeleteEnabled()) {
if (indexShard.routingEntry().primary()) {
indexShard.updateGlobalCheckpointForShard(indexShard.routingEntry().allocationId().getId(),
indexShard.getLocalCheckpoint());
} else {
indexShard.updateGlobalCheckpointOnReplica(indexShard.getLocalCheckpoint(), "test");
}
indexShard.sync();
}
// flush the buffered deletes
final FlushRequest flushRequest = new FlushRequest();
flushRequest.force(false);
Expand Down Expand Up @@ -2910,6 +2919,7 @@ public void testSegmentMemoryTrackedInBreaker() throws Exception {

// Deleting a doc causes its memory to be freed from the breaker
deleteDoc(primary, "_doc", "0");
primary.sync(); // need to sync global checkpoint as the soft-deletes retention MergePolicy depends on it.
primary.refresh("force refresh");

ss = primary.segmentStats(randomBoolean());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesQueryCache;
import org.elasticsearch.indices.IndicesRequestCache;
Expand All @@ -69,6 +71,7 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -1005,10 +1008,15 @@ private void assertCumulativeQueryCacheStats(IndicesStatsResponse response) {
}

public void testFilterCacheStats() throws Exception {
assertAcked(prepareCreate("index").setSettings(Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build()).get());
indexRandom(true,
Settings settings = Settings.builder().put(indexSettings()).put("number_of_replicas", 0).build();
assertAcked(prepareCreate("index").setSettings(settings).get());
indexRandom(false, true,
client().prepareIndex("index", "type", "1").setSource("foo", "bar"),
client().prepareIndex("index", "type", "2").setSource("foo", "baz"));
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
}
refresh();
ensureGreen();

IndicesStatsResponse response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
Expand Down Expand Up @@ -1039,6 +1047,9 @@ public void testFilterCacheStats() throws Exception {

assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "1").get().getResult());
assertEquals(DocWriteResponse.Result.DELETED, client().prepareDelete("index", "type", "2").get().getResult());
if (IndexSettings.INDEX_SOFT_DELETES_SETTING.get(settings)) {
persistGlobalCheckpoint("index"); // Need to persist the global checkpoint for the soft-deletes retention MP.
}
refresh();
response = client().admin().indices().prepareStats("index").setQueryCache(true).get();
assertCumulativeQueryCacheStats(response);
Expand Down Expand Up @@ -1172,4 +1183,21 @@ public void testConcurrentIndexingAndStatsRequests() throws BrokenBarrierExcepti
assertThat(executionFailures.get(), emptyCollectionOf(Exception.class));
}


/**
* Persist the global checkpoint on all shards of the given index into disk.
* This makes sure that the persisted global checkpoint on those shards will equal to the in-memory value.
*/
private void persistGlobalCheckpoint(String index) throws Exception {
final Set<String> nodes = internalCluster().nodesInclude(index);
for (String node : nodes) {
final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
for (IndexService indexService : indexServices) {
for (IndexShard indexShard : indexService) {
indexShard.sync();
assertThat(indexShard.getLastSyncedGlobalCheckpoint(), equalTo(indexShard.getGlobalCheckpoint()));
}
}
}
}
}
Loading

0 comments on commit 2c73969

Please sign in to comment.