Skip to content

Commit

Permalink
Move segment replication setting to IndexMetadata so that it can be s…
Browse files Browse the repository at this point in the history
…et during index creation.

This change also adds an integration test - SegmentReplicationIT.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Feb 28, 2022
1 parent d547104 commit eb2579c
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
waitForDocs(initialDocCount, indexer);
}
refresh(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
flush(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,17 @@ public Iterator<Setting<?>> settings() {
Property.IndexScope
);

public static final String SETTING_SEGMENT_REPLICATION = "index.replication.segment_replication";
/**
* Used to specify if the index should use segment replication. If false, document replication is used.
*/
public static final Setting<Boolean> INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting(
SETTING_SEGMENT_REPLICATION,
false,
Property.IndexScope,
Property.Final
);

public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INDEX_DATA_PATH_SETTING,
IndexMetadata.INDEX_FORMAT_SETTING,
IndexMetadata.INDEX_HIDDEN_SETTING,
IndexMetadata.INDEX_SEGMENT_REPLICATION_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_DEBUG_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_WARN_SETTING,
SearchSlowLog.INDEX_SEARCH_SLOWLOG_THRESHOLD_FETCH_INFO_SETTING,
Expand Down Expand Up @@ -187,7 +188,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
17 changes: 6 additions & 11 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,22 +503,13 @@ public final class IndexSettings {
Setting.Property.IndexScope
);

/**
* Used to specify if the index should use segment replication. If false, document replication is used.
*/
public static final Setting<Boolean> INDEX_SEGMENT_REPLICATION_SETTING = Setting.boolSetting(
"index.replication.segment_replication",
false,
Property.IndexScope,
Property.Final
);

private final Index index;
private final Version version;
private final Logger logger;
private final String nodeName;
private final Settings nodeSettings;
private final int numberOfShards;
private final Boolean isSegrepEnabled;
// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
private volatile Settings settings;
private volatile IndexMetadata indexMetadata;
Expand Down Expand Up @@ -661,7 +652,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
nodeName = Node.NODE_NAME_SETTING.get(settings);
this.indexMetadata = indexMetadata;
numberOfShards = settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_SHARDS, null);

isSegrepEnabled = settings.getAsBoolean(IndexMetadata.SETTING_SEGMENT_REPLICATION, false);
this.searchThrottled = INDEX_SEARCH_THROTTLED.get(settings);
this.queryStringLenient = QUERY_STRING_LENIENT_SETTING.get(settings);
this.queryStringAnalyzeWildcard = QUERY_STRING_ANALYZE_WILDCARD.get(nodeSettings);
Expand Down Expand Up @@ -891,6 +882,10 @@ public int getNumberOfReplicas() {
return settings.getAsInt(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, null);
}

public boolean isSegrepEnabled() {
return isSegrepEnabled;
}

/**
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
* index settings and the node settings where node settings are overwritten by index settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ public Engine.IndexResult applyIndexOperationOnReplica(
boolean isRetry,
SourceToParse sourceToParse
) throws IOException {
Boolean isSegRepEnabled = indexSettings.getValue(IndexSettings.INDEX_SEGMENT_REPLICATION_SETTING);
Boolean isSegRepEnabled = indexSettings.isSegrepEnabled();
if (isSegRepEnabled != null && isSegRepEnabled) {
Engine.Index index;
try {
Expand Down

0 comments on commit eb2579c

Please sign in to comment.