-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use soft-update to maintain document history #29458
Changes from 3 commits
f898440
c9d1fd1
6f3eb34
754b0e2
90ab934
82ef84d
29cbece
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -238,6 +238,12 @@ public final class IndexSettings { | |
public static final Setting<TimeValue> INDEX_GC_DELETES_SETTING = | ||
Setting.timeSetting("index.gc_deletes", DEFAULT_GC_DELETES, new TimeValue(-1, TimeUnit.MILLISECONDS), Property.Dynamic, | ||
Property.IndexScope); | ||
|
||
/** | ||
* Specifies if the index should use soft-delete instead of hard-delete for update/delete operations. | ||
*/ | ||
public static final Setting<Boolean> INDEX_SOFT_DELETES_SETTING = Setting.boolSetting("index.soft_deletes", true, Property.IndexScope); | ||
|
||
/** | ||
* The maximum number of refresh listeners allows on this shard. | ||
*/ | ||
|
@@ -296,6 +302,7 @@ public final class IndexSettings { | |
private final IndexSortConfig indexSortConfig; | ||
private final IndexScopedSettings scopedSettings; | ||
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); | ||
private final boolean softDeleteEnabled; | ||
private volatile boolean warmerEnabled; | ||
private volatile int maxResultWindow; | ||
private volatile int maxInnerResultWindow; | ||
|
@@ -412,6 +419,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti | |
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING); | ||
mergeSchedulerConfig = new MergeSchedulerConfig(this); | ||
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis(); | ||
softDeleteEnabled = scopedSettings.get(INDEX_SOFT_DELETES_SETTING); | ||
warmerEnabled = scopedSettings.get(INDEX_WARMER_ENABLED_SETTING); | ||
maxResultWindow = scopedSettings.get(MAX_RESULT_WINDOW_SETTING); | ||
maxInnerResultWindow = scopedSettings.get(MAX_INNER_RESULT_WINDOW_SETTING); | ||
|
@@ -868,4 +876,11 @@ public boolean isExplicitRefresh() { | |
* Returns the time that an index shard becomes search idle unless it's accessed in between | ||
*/ | ||
public TimeValue getSearchIdleAfter() { return searchIdleAfter; } | ||
|
||
/** | ||
* Returns <code>true</code> if soft-delete is enabled. | ||
*/ | ||
public boolean isSoftDeleteEnabled() { | ||
return getIndexVersionCreated().onOrAfter(Version.V_7_0_0_alpha1) && softDeleteEnabled; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe resolve this in the ctor? |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,8 +31,10 @@ | |
import org.apache.lucene.index.MergePolicy; | ||
import org.apache.lucene.index.SegmentCommitInfo; | ||
import org.apache.lucene.index.SegmentInfos; | ||
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; | ||
import org.apache.lucene.index.Term; | ||
import org.apache.lucene.search.IndexSearcher; | ||
import org.apache.lucene.search.MatchAllDocsQuery; | ||
import org.apache.lucene.search.ReferenceManager; | ||
import org.apache.lucene.search.SearcherFactory; | ||
import org.apache.lucene.search.SearcherManager; | ||
|
@@ -68,7 +70,6 @@ | |
import org.elasticsearch.index.seqno.LocalCheckpointTracker; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.shard.ElasticsearchMergePolicy; | ||
import org.elasticsearch.index.shard.IndexingStats; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.index.translog.Translog; | ||
import org.elasticsearch.index.translog.TranslogConfig; | ||
|
@@ -141,6 +142,7 @@ public class InternalEngine extends Engine { | |
private final CounterMetric numDocDeletes = new CounterMetric(); | ||
private final CounterMetric numDocAppends = new CounterMetric(); | ||
private final CounterMetric numDocUpdates = new CounterMetric(); | ||
private final boolean softDeleteEnabled; | ||
|
||
/** | ||
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this | ||
|
@@ -165,6 +167,7 @@ public InternalEngine(EngineConfig engineConfig) { | |
maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE); | ||
} | ||
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME; | ||
this.softDeleteEnabled = engineConfig.getIndexSettings().isSoftDeleteEnabled(); | ||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy( | ||
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(), | ||
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis() | ||
|
@@ -772,6 +775,7 @@ public IndexResult index(Index index) throws IOException { | |
} else if (plan.indexIntoLucene) { | ||
indexResult = indexIntoLucene(index, plan); | ||
} else { | ||
// TODO: We need to index stale documents to have a full history in Lucene. | ||
indexResult = new IndexResult( | ||
plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); | ||
} | ||
|
@@ -1070,10 +1074,18 @@ private boolean assertDocDoesNotExist(final Index index, final boolean allowDele | |
} | ||
|
||
private void updateDocs(final Term uid, final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException { | ||
if (docs.size() > 1) { | ||
indexWriter.updateDocuments(uid, docs); | ||
if (softDeleteEnabled) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think in the delete case we should also use |
||
if (docs.size() > 1) { | ||
indexWriter.softUpdateDocuments(uid, docs, Lucene.getSoftDeleteDVMarker()); | ||
} else { | ||
indexWriter.softUpdateDocument(uid, docs.get(0), Lucene.getSoftDeleteDVMarker()); | ||
} | ||
} else { | ||
indexWriter.updateDocument(uid, docs.get(0)); | ||
if (docs.size() > 1) { | ||
indexWriter.updateDocuments(uid, docs); | ||
} else { | ||
indexWriter.updateDocument(uid, docs.get(0)); | ||
} | ||
} | ||
numDocUpdates.inc(docs.size()); | ||
} | ||
|
@@ -1918,6 +1930,7 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx | |
return new IndexWriter(directory, iwc); | ||
} | ||
|
||
|
||
private IndexWriterConfig getIndexWriterConfig() { | ||
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); | ||
iwc.setCommitOnClose(false); // we by default don't commit on close | ||
|
@@ -1931,11 +1944,15 @@ private IndexWriterConfig getIndexWriterConfig() { | |
} | ||
iwc.setInfoStream(verbose ? InfoStream.getDefault() : new LoggerInfoStream(logger)); | ||
iwc.setMergeScheduler(mergeScheduler); | ||
MergePolicy mergePolicy = config().getMergePolicy(); | ||
// Give us the opportunity to upgrade old segments while performing | ||
// background merges | ||
mergePolicy = new ElasticsearchMergePolicy(mergePolicy); | ||
iwc.setMergePolicy(mergePolicy); | ||
MergePolicy mergePolicy = config().getMergePolicy(); | ||
if (softDeleteEnabled) { | ||
iwc.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD); | ||
// TODO: soft-delete retention policy | ||
mergePolicy = new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETE_FIELD, () -> new MatchAllDocsQuery(), mergePolicy); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need this here since it's match all. Lets just omit it until we use it |
||
} | ||
iwc.setMergePolicy(new ElasticsearchMergePolicy(mergePolicy)); | ||
iwc.setSimilarity(engineConfig.getSimilarity()); | ||
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac()); | ||
iwc.setCodec(engineConfig.getCodec()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
package org.elasticsearch.index.shard; | ||
|
||
import org.apache.logging.log4j.Logger; | ||
import org.apache.lucene.index.CodecReader; | ||
import org.apache.lucene.index.IndexWriter; | ||
import org.apache.lucene.index.MergePolicy; | ||
import org.apache.lucene.index.MergeTrigger; | ||
|
@@ -144,6 +145,11 @@ public boolean useCompoundFile(SegmentInfos segments, SegmentCommitInfo newSegme | |
return delegate.useCompoundFile(segments, newSegment, writer); | ||
} | ||
|
||
@Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of doing this I think we should extend |
||
public boolean keepFullyDeletedSegment(CodecReader reader) throws IOException { | ||
return delegate.keepFullyDeletedSegment(reader); | ||
} | ||
|
||
/** | ||
* When <code>upgrade</code> is true, running a force merge will upgrade any segments written | ||
* with older versions. This will apply to the next call to | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,8 @@ | |
import org.apache.lucene.document.StoredField; | ||
import org.apache.lucene.document.TextField; | ||
import org.apache.lucene.index.DirectoryReader; | ||
import org.apache.lucene.index.FilterDirectoryReader; | ||
import org.apache.lucene.index.FilterLeafReader; | ||
import org.apache.lucene.index.IndexCommit; | ||
import org.apache.lucene.index.IndexReader; | ||
import org.apache.lucene.index.IndexWriter; | ||
|
@@ -119,6 +121,7 @@ | |
import org.elasticsearch.index.translog.Translog; | ||
import org.elasticsearch.index.translog.TranslogConfig; | ||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; | ||
import org.elasticsearch.test.IndexSettingsModule; | ||
import org.hamcrest.MatcherAssert; | ||
import org.hamcrest.Matchers; | ||
|
||
|
@@ -504,7 +507,7 @@ public void testSegmentsWithMergeFlag() throws Exception { | |
|
||
if (flush) { | ||
// we should have had just 1 merge, so last generation should be exact | ||
assertEquals(gen2, store.readLastCommittedSegmentsInfo().getLastGeneration()); | ||
assertEquals(gen2 + 1, store.readLastCommittedSegmentsInfo().getLastGeneration()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm why does this need to change? |
||
} | ||
} | ||
} | ||
|
@@ -2711,6 +2714,12 @@ private void maybeThrowFailure() throws IOException { | |
} | ||
} | ||
|
||
@Override | ||
public long softUpdateDocument(Term term, Iterable<? extends IndexableField> doc, Field... softDeletes) throws IOException { | ||
maybeThrowFailure(); | ||
return super.softUpdateDocument(term, doc, softDeletes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ++ |
||
} | ||
|
||
@Override | ||
public long deleteDocuments(Term... terms) throws IOException { | ||
maybeThrowFailure(); | ||
|
@@ -4537,6 +4546,41 @@ public void testTrimUnsafeCommits() throws Exception { | |
} | ||
} | ||
|
||
/** | ||
* A simple test checks that the document history is maintained when a document is updated. | ||
*/ | ||
public void testUpdateMaintainDocumentHistory() throws Exception { | ||
final IndexMetaData indexMetaData = IndexMetaData.builder("test") | ||
.settings(settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)) | ||
.numberOfShards(1).numberOfReplicas(1).build(); | ||
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); | ||
final String docId = "doc-id"; | ||
final int versions = between(2, 20); | ||
try (Store store = createStore(); | ||
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), new LogDocMergePolicy())) { | ||
final ParsedDocument doc = testParsedDocument(docId, null, testDocument(), new BytesArray("{}"), null); | ||
for (int version = 1; version <= versions; version++) { | ||
Engine.IndexResult indexResult = engine.index(indexForDoc(doc)); | ||
assertThat(indexResult.getFailure(), nullValue()); | ||
if (randomBoolean()) { | ||
engine.flush(); | ||
} | ||
} | ||
engine.refresh("test"); | ||
try (Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) { | ||
assertThat(searcher.reader().numDocs(), equalTo(1)); | ||
assertThat(searcher.reader().numDeletedDocs(), equalTo(versions - 1)); | ||
try (Engine.GetResult get = engine.get(new Engine.Get(true, false, doc.type(), doc.id(), newUid(doc)), engine::acquireSearcher)) { | ||
assertThat((int) get.version(), equalTo(versions)); | ||
} | ||
// Use a reader which includes soft-deleted documents to verify history. | ||
final UnfilteredReader unfilteredReader = new UnfilteredReader(searcher.getDirectoryReader()); | ||
assertThat(unfilteredReader.numDocs(), equalTo(versions)); | ||
assertThat(unfilteredReader.numDeletedDocs(), equalTo(0)); | ||
} | ||
} | ||
} | ||
|
||
private static void trimUnsafeCommits(EngineConfig config) throws IOException { | ||
final Store store = config.getStore(); | ||
final TranslogConfig translogConfig = config.getTranslogConfig(); | ||
|
@@ -4555,4 +4599,50 @@ void assertLuceneOperations(InternalEngine engine, long expectedAppends, long ex | |
assertThat(message, engine.getNumDocUpdates(), equalTo(expectedUpdates)); | ||
assertThat(message, engine.getNumDocDeletes(), equalTo(expectedDeletes)); | ||
} | ||
|
||
/** | ||
* A reader that does not exclude soft-deleted documents. | ||
*/ | ||
static final class UnfilteredReader extends FilterDirectoryReader { | ||
static final class UnfilteredSubReaderWrapper extends SubReaderWrapper { | ||
@Override | ||
public LeafReader wrap(LeafReader in) { | ||
return new FilterLeafReader(in) { | ||
@Override | ||
public CacheHelper getCoreCacheHelper() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can just move this entire thing to a util method in Lucene.java then we can reuse it in other places WDYT? @martijnvg you need this too somewhere right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed the test and this wrapper in this PR, and will add it back to Lucene.java later. I prefer to keep this change as a cut-over. |
||
return null; | ||
} | ||
|
||
@Override | ||
public CacheHelper getReaderCacheHelper() { | ||
return null; | ||
} | ||
|
||
@Override | ||
public int numDocs() { | ||
return maxDoc(); | ||
} | ||
|
||
@Override | ||
public Bits getLiveDocs() { | ||
return null; | ||
} | ||
}; | ||
} | ||
} | ||
|
||
UnfilteredReader(DirectoryReader in) throws IOException { | ||
super(in, new UnfilteredSubReaderWrapper()); | ||
} | ||
|
||
@Override | ||
protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) throws IOException { | ||
return new UnfilteredReader(in); | ||
} | ||
|
||
@Override | ||
public CacheHelper getReaderCacheHelper() { | ||
return null; // we are modifying live docs. | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call this
newSoftDeletesField
?