Skip to content

Commit

Permalink
Added support for feature flags in opensearch.yml (opensearch-project…
Browse files Browse the repository at this point in the history
…#4959)

This change introduces a static store "settings"
in FeatureFlags.java file to enable isEnabled method to
fetch flag settings defined in opensearch.yml.

Signed-off-by: Nagaraj Tantri <[email protected]>

(cherry picked from commit 241bd42)
  • Loading branch information
ntantri committed Jan 8, 2023
1 parent 06c1712 commit d4e80b8
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 33 deletions.
16 changes: 15 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,27 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Update to Gradle 7.6 ([#5382](https://github.com/opensearch-project/OpenSearch/pull/5382))
- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299))
- Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229))
- Added jackson dependency to server ([#5366] (https://github.com/opensearch-project/OpenSearch/pull/5366))
- Added jackson dependency to server ([#5366](https://github.com/opensearch-project/OpenSearch/pull/5366))
- Adding support to register settings dynamically ([#5495](https://github.com/opensearch-project/OpenSearch/pull/5495))
- Adding auto release workflow ([#5582](https://github.com/opensearch-project/OpenSearch/pull/5582))
- Added experimental support for extensions ([#5347](https://github.com/opensearch-project/OpenSearch/pull/5347)), ([#5518](https://github.com/opensearch-project/OpenSearch/pull/5518)), ([#5597](https://github.com/opensearch-project/OpenSearch/pull/5597)), ([#5615](https://github.com/opensearch-project/OpenSearch/pull/5615)))
- Add CI bundle pattern to distribution download ([#5348](https://github.com/opensearch-project/OpenSearch/pull/5348))
- Added @gbbafna as an OpenSearch maintainer ([#5668](https://github.com/opensearch-project/OpenSearch/pull/5668))
- Experimental support for extended backward compatiblity in searchable snapshots ([#5429](https://github.com/opensearch-project/OpenSearch/pull/5429))
- Add feature flag for extensions ([#5211](https://github.com/opensearch-project/OpenSearch/pull/5211))
- Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001))
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325)
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
- BWC version 2.2.2 ([#4383](https://github.com/opensearch-project/OpenSearch/pull/4383))
- Support for labels on version bump PRs, skip label support for changelog verifier ([#4391](https://github.com/opensearch-project/OpenSearch/pull/4391))
- Add a new node role 'search' which is dedicated to provide search capability ([#4689](https://github.com/opensearch-project/OpenSearch/pull/4689))
- Introduce experimental searchable snapshot API ([#4680](https://github.com/opensearch-project/OpenSearch/pull/4680))
- Added support for feature flags in opensearch.yml ([#4959](https://github.com/opensearch-project/OpenSearch/pull/4959))

### Dependencies
- Bump bcpg-fips from 1.0.5.1 to 1.0.7.1 ([#5148](https://github.com/opensearch-project/OpenSearch/pull/5148))
Expand Down
23 changes: 23 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,26 @@ ${path.logs}
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true
#
# ---------------------------------- Experimental Features -----------------------------------
#
# Gates the visibility of the index setting that allows changing of replication type.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.replication_type.enabled: false
#
#
# Gates the visibility of the index setting that allows persisting data to remote store along with local disk.
# Once the feature is ready for production release, this feature flag can be removed.
#
#opensearch.experimental.feature.remote_store.enabled: false
#
#
# Gates the functionality of a new parameter to the snapshot restore API
# that allows for creation of a new index type that searches a snapshot
# directly in a remote repository without restoring all index data to disk
# ahead of time.
#
#opensearch.experimental.feature.searchable_snapshot.enabled: false
#
#
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.OpenSearchCorruptionException;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
Expand Down Expand Up @@ -66,11 +65,6 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@BeforeClass
public static void assumeFeatureFlag() {
assumeTrue("Segment replication Feature flag is enabled", Boolean.parseBoolean(System.getProperty(FeatureFlags.REPLICATION_TYPE)));
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
Expand All @@ -92,11 +86,16 @@ protected boolean addMockInternalEngine() {
return false;
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();
}

public void testPrimaryStopped_ReplicaPromoted() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
Expand All @@ -123,7 +122,7 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);

// start another node, index another doc and replicate.
String nodeC = internalCluster().startNode();
String nodeC = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("4").setSource("baz", "baz").get();
refresh(INDEX_NAME);
Expand All @@ -134,10 +133,10 @@ public void testPrimaryStopped_ReplicaPromoted() throws Exception {
}

public void testRestartPrimary() throws Exception {
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertEquals(getNodeContainingPrimaryShard().getName(), primary);
Expand All @@ -163,10 +162,10 @@ public void testRestartPrimary() throws Exception {

public void testCancelPrimaryAllocation() throws Exception {
// this test cancels allocation on the primary - promoting the new replica and recreating the former primary as a replica.
final String primary = internalCluster().startNode();
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startNode();
final String replica = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

final int initialDocCount = 1;
Expand Down Expand Up @@ -197,10 +196,13 @@ public void testCancelPrimaryAllocation() throws Exception {

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testAddNewReplicaFailure() throws Exception {
logger.info("--> starting [Primary Node] ...");
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());

logger.info("--> creating test index ...");
prepareCreate(
Expand All @@ -223,7 +225,7 @@ public void testAddNewReplicaFailure() throws Exception {
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

// Mock transport service to add behaviour of throwing corruption exception during segment replication process.
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(
Expand Down Expand Up @@ -265,8 +267,8 @@ public void testAddNewReplicaFailure() throws Exception {
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -351,8 +353,8 @@ public void testMultipleShards() throws Exception {
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, indexSettings);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -392,8 +394,8 @@ public void testMultipleShards() throws Exception {
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

Expand Down Expand Up @@ -437,11 +439,11 @@ public void testReplicationAfterForceMerge() throws Exception {
}

public void testCancellation() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
ensureYellow(INDEX_NAME);

final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());

final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster().getInstance(
SegmentReplicationSourceService.class,
Expand Down Expand Up @@ -496,7 +498,7 @@ public void testCancellation() throws Exception {
}

public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode();
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

Expand All @@ -519,7 +521,7 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
final String replicaNode = internalCluster().startNode();
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
Expand All @@ -534,8 +536,8 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
}

public void testDeleteOperations() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -595,7 +597,59 @@ public void testDeleteOperations() throws Exception {
}, 5, TimeUnit.SECONDS);
}
}
public void testUpdateOperations() throws Exception {
final String primary = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureYellow(INDEX_NAME);
final String replica = internalCluster().startNode(featureFlagSettings());

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();

// wait a short amount of time to give replication a chance to complete.
assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);
waitForReplicaUpdate();

assertHitCount(client(primary).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(replica).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

Set<String> ids = indexer.getIds();
String id = ids.toArray()[0].toString();
UpdateResponse updateResponse = client(primary).prepareUpdate(INDEX_NAME, id)
.setDoc(Requests.INDEX_CONTENT_TYPE, "foo", "baz")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();
assertFalse("request shouldn't have forced a refresh", updateResponse.forcedRefresh());
assertEquals(2, updateResponse.getVersion());

refresh(INDEX_NAME);
waitForReplicaUpdate();

assertSearchHits(client(primary).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);
assertSearchHits(client(replica).prepareSearch(INDEX_NAME).setQuery(matchQuery("foo", "baz")).get(), id);

}
}
private void assertSegmentStats(int numberOfReplicas) throws IOException {
final IndicesSegmentResponse indicesSegmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest()).actionGet();

Expand Down Expand Up @@ -647,10 +701,10 @@ public void testDropPrimaryDuringReplication() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 6)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode(Settings.EMPTY);
final String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(featureFlagSettings());
final String primaryNode = internalCluster().startDataOnlyNode(featureFlagSettings());
createIndex(INDEX_NAME, settings);
internalCluster().startDataOnlyNodes(6);
internalCluster().startDataOnlyNodes(6, featureFlagSettings());
ensureGreen(INDEX_NAME);

int initialDocCount = scaledRandomIntBetween(100, 200);
Expand All @@ -673,7 +727,7 @@ public void testDropPrimaryDuringReplication() throws Exception {
ensureYellow(INDEX_NAME);

// start another replica.
internalCluster().startDataOnlyNode();
internalCluster().startDataOnlyNode(featureFlagSettings());
ensureGreen(INDEX_NAME);

// index another doc and refresh - without this the new replica won't catch up.
Expand Down
Loading

0 comments on commit d4e80b8

Please sign in to comment.