Skip to content

Commit

Permalink
Update merge on refresh and merge on commit defaults in Opensearch (L…
Browse files Browse the repository at this point in the history
…ucene 9.3) (opensearch-project#3561)

* Update merge on refresh and merge on commit defaults in Opensearch (Lucene 9.3)

Signed-off-by: Andriy Redko <[email protected]>

* Addressing code review comments, adding more tests

Signed-off-by: Andriy Redko <[email protected]>
(cherry picked from commit a34a4c0)
Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Jul 26, 2022
1 parent 888d4bf commit ce3af3c
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
50 changes: 46 additions & 4 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
package org.opensearch.index;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.Strings;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
Expand All @@ -53,9 +54,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;

import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING;
import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING;
Expand All @@ -73,6 +76,9 @@
* @opensearch.internal
*/
public final class IndexSettings {
private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default";
private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";

public static final Setting<List<String>> DEFAULT_FIELD_SETTING = Setting.listSetting(
"index.query.default_field",
Collections.singletonList("*"),
Expand Down Expand Up @@ -515,14 +521,21 @@ public final class IndexSettings {
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
"index.merge_on_flush.max_full_flush_merge_wait_time",
new TimeValue(10, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
new TimeValue(1, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
"index.merge_on_flush.enabled",
false,
true, /* https://issues.apache.org/jira/browse/LUCENE-10078 */
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString(
"index.merge_on_flush.policy",
MERGE_ON_FLUSH_DEFAULT_POLICY,
Property.IndexScope,
Property.Dynamic
);
Expand Down Expand Up @@ -617,6 +630,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
* Is merge of flush enabled or not
*/
private volatile boolean mergeOnFlushEnabled;
/**
* Specialized merge-on-flush policy if provided
*/
private volatile UnaryOperator<MergePolicy> mergeOnFlushPolicy;

/**
* Returns the default search fields for this index.
Expand Down Expand Up @@ -733,6 +750,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -804,6 +822,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -874,7 +893,7 @@ public String getUUID() {
* Returns <code>true</code> if the index has a custom data path
*/
public boolean hasCustomDataPath() {
return Strings.isNotEmpty(customDataPath());
return !Strings.isEmpty(customDataPath());
}

/**
Expand Down Expand Up @@ -1390,4 +1409,27 @@ public TimeValue getMaxFullFlushMergeWaitTime() {
public boolean isMergeOnFlushEnabled() {
return mergeOnFlushEnabled;
}

private void setMergeOnFlushPolicy(String policy) {
if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) {
mergeOnFlushPolicy = null;
} else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) {
this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new;
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey()
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ MERGE_ON_FLUSH_DEFAULT_POLICY
+ ", "
+ MERGE_ON_FLUSH_MERGE_POLICY
);
}
}

public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -133,6 +132,7 @@
import java.util.function.BiFunction;
import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -2448,14 +2448,14 @@ private IndexWriterConfig getIndexWriterConfig() {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
final Optional<UnaryOperator<MergePolicy>> mergeOnFlushPolicy = config().getIndexSettings().getMergeOnFlushPolicy();
if (mergeOnFlushPolicy.isPresent()) {
mergePolicy = mergeOnFlushPolicy.get().apply(mergePolicy);
}
}
} else {
// Disable merge on refresh
iwc.setMaxFullFlushMergeWaitMillis(0);
}

iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,8 @@ public void testWrapAllDocsLive() throws Exception {
public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception {
Directory dir = newDirectory();
IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy()));
// override 500ms default introduced in
// https://issues.apache.org/jira/browse/LUCENE-10078
config.setMaxFullFlushMergeWaitMillis(0);
.setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy()))
.setMaxFullFlushMergeWaitMillis(0);
IndexWriter writer = new IndexWriter(dir, config);
int numDocs = between(1, 10);
List<String> liveDocs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,7 @@ public void testMergeSegmentsOnCommitIsDisabled() throws Exception {

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down Expand Up @@ -572,7 +571,7 @@ public void testMergeSegmentsOnCommit() throws Exception {
final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down Expand Up @@ -634,14 +633,52 @@ public void testMergeSegmentsOnCommit() throws Exception {
}
}

public void testMergeSegmentsOnCommitDefault() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings());
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

final TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setSegmentsPerTier(2);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get)
)
) {
List<Segment> segments = engine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));

ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
engine.refresh("test");

segments = engine.segments(true);
assertThat(segments.size(), equalTo(1));

ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null);
engine.index(indexForDoc(doc2));
engine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_3, null);
engine.index(indexForDoc(doc3));
engine.refresh("test");

segments = engine.segments(true);
assertThat(segments.size(), equalTo(2));
}
}

// this test writes documents to the engine while concurrently flushing/commit
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public void testGetForUpdate() throws IOException {
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)

.build();
IndexMetadata metadata = IndexMetadata.builder("test")
.putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")
Expand Down

0 comments on commit ce3af3c

Please sign in to comment.