diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 8841794e0e492..be1db6635e162 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -193,6 +193,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING, IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED, IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, + IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index e40acb94ee498..322d05ca86ff0 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -32,11 +32,12 @@ package org.opensearch.index; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.util.Strings; import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy; import org.opensearch.LegacyESVersion; import org.opensearch.Version; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.Strings; import org.opensearch.common.logging.Loggers; import org.opensearch.common.settings.IndexScopedSettings; import org.opensearch.common.settings.Setting; @@ -53,9 +54,11 @@ import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.UnaryOperator; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_DEPTH_LIMIT_SETTING; import static org.opensearch.index.mapper.MapperService.INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING; @@ -73,6 +76,9 @@ * @opensearch.internal */ public final class IndexSettings { + private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default"; + private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; + public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( "index.query.default_field", Collections.singletonList("*"), @@ -515,14 +521,21 @@ public final class IndexSettings { public static final Setting INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting( "index.merge_on_flush.max_full_flush_merge_wait_time", new TimeValue(10, TimeUnit.SECONDS), - new TimeValue(0, TimeUnit.MILLISECONDS), + new TimeValue(1, TimeUnit.MILLISECONDS), Property.Dynamic, Property.IndexScope ); public static final Setting INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting( "index.merge_on_flush.enabled", - false, + true, /* https://issues.apache.org/jira/browse/LUCENE-10078 */ + Property.IndexScope, + Property.Dynamic + ); + + public static final Setting INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString( + "index.merge_on_flush.policy", + MERGE_ON_FLUSH_DEFAULT_POLICY, Property.IndexScope, Property.Dynamic ); @@ -617,6 +630,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { * Is merge of flush enabled or not */ private volatile boolean mergeOnFlushEnabled; + /** + * Specialized merge-on-flush policy if provided + */ + private volatile UnaryOperator mergeOnFlushPolicy; /** * Returns the default search fields for this index. @@ -733,6 +750,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING); maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME); mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED); + setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer( @@ -804,6 +822,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); + scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { @@ -874,7 +893,7 @@ public String getUUID() { * Returns true if the index has a custom data path */ public boolean hasCustomDataPath() { - return Strings.isNotEmpty(customDataPath()); + return !Strings.isEmpty(customDataPath()); } /** @@ -1390,4 +1409,27 @@ public TimeValue getMaxFullFlushMergeWaitTime() { public boolean isMergeOnFlushEnabled() { return mergeOnFlushEnabled; } + + private void setMergeOnFlushPolicy(String policy) { + if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) { + mergeOnFlushPolicy = null; + } else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) { + this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new; + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey() + + " has unsupported policy specified: " + + policy + + ". Please use one of: " + + MERGE_ON_FLUSH_DEFAULT_POLICY + + ", " + + MERGE_ON_FLUSH_MERGE_POLICY + ); + } + } + + public Optional> getMergeOnFlushPolicy() { + return Optional.ofNullable(mergeOnFlushPolicy); + } } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 23321db4f1a65..2832864a8a514 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -50,7 +50,6 @@ import org.apache.lucene.index.ShuffleForcedMergePolicy; import org.apache.lucene.index.SoftDeletesRetentionMergePolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.DocIdSetIterator; @@ -133,6 +132,7 @@ import java.util.function.BiFunction; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.function.UnaryOperator; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -2448,14 +2448,14 @@ private IndexWriterConfig getIndexWriterConfig() { final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis(); if (maxFullFlushMergeWaitMillis > 0) { iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis); - mergePolicy = new MergeOnFlushMergePolicy(mergePolicy); - } else { - logger.warn( - "The {} is enabled but {} is set to 0, merge on flush will not be activated", - IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), - IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey() - ); + final Optional> mergeOnFlushPolicy = config().getIndexSettings().getMergeOnFlushPolicy(); + if (mergeOnFlushPolicy.isPresent()) { + mergePolicy = mergeOnFlushPolicy.get().apply(mergePolicy); + } } + } else { + // Disable merge on refresh + iwc.setMaxFullFlushMergeWaitMillis(0); } iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); diff --git a/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java b/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java index 776b44d346fb5..0edcd55cc35c3 100644 --- a/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java +++ b/server/src/test/java/org/opensearch/common/lucene/LuceneTests.java @@ -590,10 +590,8 @@ public void testWrapAllDocsLive() throws Exception { public void testWrapLiveDocsNotExposeAbortedDocuments() throws Exception { Directory dir = newDirectory(); IndexWriterConfig config = newIndexWriterConfig().setSoftDeletesField(Lucene.SOFT_DELETES_FIELD) - .setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy())); - // override 500ms default introduced in - // https://issues.apache.org/jira/browse/LUCENE-10078 - config.setMaxFullFlushMergeWaitMillis(0); + .setMergePolicy(new SoftDeletesRetentionMergePolicy(Lucene.SOFT_DELETES_FIELD, MatchAllDocsQuery::new, newMergePolicy())) + .setMaxFullFlushMergeWaitMillis(0); IndexWriter writer = new IndexWriter(dir, config); int numDocs = between(1, 10); List liveDocs = new ArrayList<>(); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 8ffd9af7e653d..b04331fd40a21 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -499,8 +499,7 @@ public void testMergeSegmentsOnCommitIsDisabled() throws Exception { final Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) - .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0)) - .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true); + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), false); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -572,7 +571,7 @@ public void testMergeSegmentsOnCommit() throws Exception { final Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000)) - .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true); + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush"); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); @@ -634,6 +633,44 @@ public void testMergeSegmentsOnCommit() throws Exception { } } + public void testMergeSegmentsOnCommitDefault() throws Exception { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + + final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()); + final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); + final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); + + final TieredMergePolicy mergePolicy = new TieredMergePolicy(); + mergePolicy.setSegmentsPerTier(2); + + try ( + Store store = createStore(); + InternalEngine engine = createEngine( + config(indexSettings, store, createTempDir(), mergePolicy, null, null, globalCheckpoint::get) + ) + ) { + List segments = engine.segments(true); + assertThat(segments.isEmpty(), equalTo(true)); + + ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null); + engine.index(indexForDoc(doc)); + engine.refresh("test"); + + segments = engine.segments(true); + assertThat(segments.size(), equalTo(1)); + + ParsedDocument doc2 = testParsedDocument("2", null, testDocumentWithTextField(), B_2, null); + engine.index(indexForDoc(doc2)); + engine.refresh("test"); + ParsedDocument doc3 = testParsedDocument("3", null, testDocumentWithTextField(), B_3, null); + engine.index(indexForDoc(doc3)); + engine.refresh("test"); + + segments = engine.segments(true); + assertThat(segments.size(), equalTo(2)); + } + } + // this test writes documents to the engine while concurrently flushing/commit public void testConcurrentMergeSegmentsOnCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); @@ -641,7 +678,7 @@ public void testConcurrentMergeSegmentsOnCommit() throws Exception { final Settings.Builder settings = Settings.builder() .put(defaultSettings.getSettings()) .put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000)) - .put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true); + .put(IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY.getKey(), "merge-on-flush"); final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build(); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata); diff --git a/server/src/test/java/org/opensearch/index/shard/ShardGetServiceTests.java b/server/src/test/java/org/opensearch/index/shard/ShardGetServiceTests.java index 5dd053574268e..f29303ce8dda1 100644 --- a/server/src/test/java/org/opensearch/index/shard/ShardGetServiceTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ShardGetServiceTests.java @@ -56,7 +56,6 @@ public void testGetForUpdate() throws IOException { .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .build(); IndexMetadata metadata = IndexMetadata.builder("test") .putMapping("{ \"properties\": { \"foo\": { \"type\": \"text\"}}}")