Skip to content

Commit

Permalink
Introduce cluster default and minimum index refresh interval settings (
Browse files Browse the repository at this point in the history
…opensearch-project#9267)

Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: Kaushal Kumar <[email protected]>
  • Loading branch information
ashking94 authored and kaushalmahi12 committed Sep 12, 2023
1 parent b36fa4b commit 867f81d
Show file tree
Hide file tree
Showing 11 changed files with 762 additions and 50 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.metadata;

import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.indices.IndicesService;

public class ClusterIndexRefreshIntervalWithNodeSettingsIT extends ClusterIndexRefreshIntervalIT {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), getDefaultRefreshInterval())
.put(
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING.getKey(),
getMinRefreshIntervalForRefreshDisabled().toString()
)
.build();
}

@Override
protected TimeValue getMinRefreshIntervalForRefreshDisabled() {
return TimeValue.timeValueSeconds(1);
}

@Override
protected TimeValue getDefaultRefreshInterval() {
return TimeValue.timeValueSeconds(5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,11 @@
import org.opensearch.common.compress.CompressedXContent;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.logging.DeprecationLogger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
Expand Down Expand Up @@ -572,7 +574,8 @@ private ClusterState applyCreateIndexRequestWithV1Templates(
settings,
indexScopedSettings,
shardLimitValidator,
indexSettingProviders
indexSettingProviders,
clusterService.getClusterSettings()
);
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards);
Expand Down Expand Up @@ -636,7 +639,8 @@ private ClusterState applyCreateIndexRequestWithV2Template(
settings,
indexScopedSettings,
shardLimitValidator,
indexSettingProviders
indexSettingProviders,
clusterService.getClusterSettings()
);
int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null);
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards);
Expand Down Expand Up @@ -716,7 +720,8 @@ private ClusterState applyCreateIndexRequestWithExistingMetadata(
settings,
indexScopedSettings,
shardLimitValidator,
indexSettingProviders
indexSettingProviders,
clusterService.getClusterSettings()
);
final int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, sourceMetadata);
IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards);
Expand Down Expand Up @@ -799,7 +804,8 @@ static Settings aggregateIndexSettings(
Settings settings,
IndexScopedSettings indexScopedSettings,
ShardLimitValidator shardLimitValidator,
Set<IndexSettingProvider> indexSettingProviders
Set<IndexSettingProvider> indexSettingProviders,
ClusterSettings clusterSettings
) {
// Create builders for the template and request settings. We transform these into builders
// because we may want settings to be "removed" from these prior to being set on the new
Expand Down Expand Up @@ -914,6 +920,7 @@ static Settings aggregateIndexSettings(
}
validateTranslogRetentionSettings(indexSettings);
validateStoreTypeSettings(indexSettings);
validateRefreshIntervalSettings(request.settings(), clusterSettings);

return indexSettings;
}
Expand Down Expand Up @@ -1468,4 +1475,27 @@ public static void validateTranslogRetentionSettings(Settings indexSettings) {
}
}
}

/**
* Validates {@code index.refresh_interval} is equal or below the {@code cluster.minimum.index.refresh_interval}.
*
* @param requestSettings settings passed in during index create request
* @param clusterSettings cluster setting
*/
static void validateRefreshIntervalSettings(Settings requestSettings, ClusterSettings clusterSettings) {
if (IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.exists(requestSettings) == false) {
return;
}
TimeValue requestRefreshInterval = IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.get(requestSettings);
TimeValue clusterMinimumRefreshInterval = clusterSettings.get(IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING);
if (requestRefreshInterval.millis() < clusterMinimumRefreshInterval.millis()) {
throw new IllegalArgumentException(
"invalid index.refresh_interval ["
+ requestRefreshInterval
+ "]: cannot be smaller than cluster.minimum.index.refresh_interval ["
+ clusterMinimumRefreshInterval
+ "]"
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public void updateSettings(
.put(request.settings())
.normalizePrefix(IndexMetadata.INDEX_SETTING_PREFIX)
.build();

MetadataCreateIndexService.validateRefreshIntervalSettings(normalizedSettings, clusterService.getClusterSettings());

Settings.Builder settingsForClosedIndices = Settings.builder();
Settings.Builder settingsForOpenIndices = Settings.builder();
final Set<String> skippedSettings = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
IndicesService.CLUSTER_REPLICATION_TYPE_SETTING,
Expand Down
7 changes: 5 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -598,7 +599,8 @@ public IndexService newIndexService(
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -654,7 +656,8 @@ public IndexService newIndexService(
expressionResolver,
valuesSourceRegistry,
recoveryStateFactory,
translogFactorySupplier
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier
);
success = true;
return indexService;
Expand Down
84 changes: 57 additions & 27 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final Supplier<Sort> indexSortSupplier;
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -208,7 +209,8 @@ public IndexService(
IndexNameExpressionResolver expressionResolver,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -275,6 +277,7 @@ public IndexService(
this.readerWrapper = wrapperFactory.apply(this);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.clusterDefaultRefreshIntervalSupplier = clusterDefaultRefreshIntervalSupplier;
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
Expand Down Expand Up @@ -895,36 +898,47 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
);
}
}
if (refreshTask.getInterval().equals(indexSettings.getRefreshInterval()) == false) {
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search requests
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("forced refresh failed after interval change", e);
}

@Override
protected void doRun() throws Exception {
maybeRefreshEngine(true);
}

@Override
public boolean isForceExecution() {
return true;
}
});
rescheduleRefreshTasks();
}
onRefreshIntervalChange();
updateFsyncTaskIfNecessary();
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
}

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
* indexes relying on cluster default.
* 2. {@code index.refresh_interval} index setting changes.
*/
public void onRefreshIntervalChange() {
if (refreshTask.getInterval().equals(getRefreshInterval())) {
return;
}
// once we change the refresh interval we schedule yet another refresh
// to ensure we are in a clean and predictable state.
// it doesn't matter if we move from or to <code>-1</code> in both cases we want
// docs to become visible immediately. This also flushes all pending indexing / search requests
// that are waiting for a refresh.
threadPool.executor(ThreadPool.Names.REFRESH).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("forced refresh failed after interval change", e);
}

@Override
protected void doRun() throws Exception {
maybeRefreshEngine(true);
}

@Override
public boolean isForceExecution() {
return true;
}
});
rescheduleRefreshTasks();
}

private void updateFsyncTaskIfNecessary() {
if (indexSettings.getTranslogDurability() == Translog.Durability.REQUEST) {
try {
Expand Down Expand Up @@ -989,7 +1003,7 @@ private void maybeFSyncTranslogs() {
}

private void maybeRefreshEngine(boolean force) {
if (indexSettings.getRefreshInterval().millis() > 0 || force) {
if (getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
shard.scheduledRefresh();
Expand Down Expand Up @@ -1060,6 +1074,17 @@ private void sync(final Consumer<IndexShard> sync, final String source) {
}
}

/**
* Gets the refresh interval seen by the index service. Index setting overrides takes the highest precedence.
* @return the refresh interval.
*/
private TimeValue getRefreshInterval() {
if (getIndexSettings().isExplicitRefresh()) {
return getIndexSettings().getRefreshInterval();
}
return clusterDefaultRefreshIntervalSupplier.get();
}

/**
* Base asynchronous task
*
Expand Down Expand Up @@ -1120,7 +1145,7 @@ public String toString() {
final class AsyncRefreshTask extends BaseAsyncTask {

AsyncRefreshTask(IndexService indexService) {
super(indexService, indexService.getIndexSettings().getRefreshInterval());
super(indexService, indexService.getRefreshInterval());
}

@Override
Expand Down Expand Up @@ -1242,6 +1267,11 @@ AsyncRefreshTask getRefreshTask() { // for tests
return refreshTask;
}

// Visible for test
public TimeValue getRefreshTaskInterval() {
return refreshTask.getInterval();
}

AsyncTranslogFSync getFsyncTask() { // for tests
return fsyncTask;
}
Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,11 @@ public final class IndexSettings {
Property.Deprecated
);
public static final TimeValue DEFAULT_REFRESH_INTERVAL = new TimeValue(1, TimeUnit.SECONDS);
public static final TimeValue MINIMUM_REFRESH_INTERVAL = new TimeValue(-1, TimeUnit.MILLISECONDS);
public static final Setting<TimeValue> INDEX_REFRESH_INTERVAL_SETTING = Setting.timeSetting(
"index.refresh_interval",
DEFAULT_REFRESH_INTERVAL,
new TimeValue(-1, TimeUnit.MILLISECONDS),
MINIMUM_REFRESH_INTERVAL,
Property.Dynamic,
Property.IndexScope
);
Expand Down
Loading

0 comments on commit 867f81d

Please sign in to comment.