Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into remote-state-stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Aman Khare committed Oct 23, 2023
2 parents 6606b53 + 8f13dee commit a1efd42
Show file tree
Hide file tree
Showing 12 changed files with 449 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814))
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FollowersChecker;
import org.opensearch.cluster.coordination.LeaderChecker;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -23,15 +25,20 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Before
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

public void setup() {
internalCluster().startNodes(3);
}

public void testStatsResponseFromAllNodes() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() {
}

public void testStatsResponseAllShards() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() {
}

public void testStatsResponseFromLocalNode() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() {
}

public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
}

public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
setup();
// Scenario:
// - Create index with single primary and N-1 replica shards (N = no of data nodes)
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
}

public void testStatsOnShardRelocation() {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Index documents
Expand Down Expand Up @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() {
}

public void testStatsOnShardUnassigned() throws IOException {
setup();
// Scenario:
// - Create index with single primary and two replica shard
// - Index documents
Expand All @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException {
}

public void testStatsOnRemoteStoreRestore() throws IOException {
setup();
// Creating an index with primary shard count == total nodes in cluster and 0 replicas
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount));
Expand Down Expand Up @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {
}

public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception {
setup();
// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
}, 5, TimeUnit.SECONDS);
}

public void testStatsCorrectnessOnFailover() {
Settings clusterSettings = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(nodeSettings(0))
.build();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings);
internalCluster().startDataOnlyNodes(2, clusterSettings);

// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);

// Index some docs and refresh
indexDocs();
refresh(INDEX_NAME);

String primaryNode = primaryNodeName(INDEX_NAME);
String replicaNode = replicaNodeName(INDEX_NAME);

// Start network disruption - primary node will be isolated
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);
logger.info("--> network disruption is started");
networkDisruption.startDisrupting();
ensureStableCluster(2, clusterManagerNode);

RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0");
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
assertEquals(0, segmentStats.refreshTimeLagMs);

networkDisruption.stopDisrupting();
internalCluster().clearDisruptionScheme();
ensureStableCluster(3, clusterManagerNode);
ensureGreen(INDEX_NAME);
logger.info("Test completed");
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {

// Settings for remote translog
IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING,

// Settings for remote store enablement
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,23 @@ public class RemoteClusterStateService implements Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

// TODO make this two variable as dynamic setting [issue: #10688]
public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.global_metadata.upload_timeout",
GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
Expand Down Expand Up @@ -141,6 +155,9 @@ public class RemoteClusterStateService implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);
private final RemotePersistenceStats remoteStateStats;
public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
Expand Down Expand Up @@ -171,7 +188,11 @@ public RemoteClusterStateService(
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
this.threadpool = threadPool;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
}

Expand Down Expand Up @@ -372,7 +393,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
);

try {
if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
Expand Down Expand Up @@ -427,7 +448,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
}

try {
if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
Expand Down Expand Up @@ -626,6 +647,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}

private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) {
this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout;
}

private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) {
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}

public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,14 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

public static final Setting<Integer> INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING = Setting.intSetting(
"index.remote_store.translog.keep_extra_gen",
100,
0,
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand All @@ -680,6 +688,7 @@ public static IndexMergePolicy fromString(String text) {
private final String remoteStoreTranslogRepository;
private final String remoteStoreRepository;
private final boolean isRemoteSnapshot;
private int remoteTranslogKeepExtraGen;
private Version extendedCompatibilitySnapshotVersion;

// volatile fields are updated via #updateIndexMetadata(IndexMetadata) under lock
Expand Down Expand Up @@ -850,6 +859,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
remoteStoreTranslogRepository = settings.get(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY);
remoteTranslogUploadBufferInterval = INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
remoteStoreRepository = settings.get(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY);
this.remoteTranslogKeepExtraGen = INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.get(settings);
isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(this.settings);

if (isRemoteSnapshot && FeatureFlags.isEnabled(SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY)) {
Expand Down Expand Up @@ -1021,6 +1031,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
this::setRemoteTranslogUploadBufferInterval
);
scopedSettings.addSettingsUpdateConsumer(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING, this::setRemoteTranslogKeepExtraGen);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1300,6 +1311,10 @@ public TimeValue getRemoteTranslogUploadBufferInterval() {
return remoteTranslogUploadBufferInterval;
}

public int getRemoteTranslogExtraKeep() {
return remoteTranslogKeepExtraGen;
}

/**
* Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set.
*/
Expand All @@ -1311,6 +1326,10 @@ public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUpload
this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval;
}

public void setRemoteTranslogKeepExtraGen(int extraGen) {
this.remoteTranslogKeepExtraGen = extraGen;
}

/**
* Returns this interval in which the shards of this index are asynchronously refreshed. {@code -1} means async refresh is disabled.
*/
Expand Down
Loading

0 comments on commit a1efd42

Please sign in to comment.