Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into rs-snap-change-ver
Browse files Browse the repository at this point in the history
  • Loading branch information
shourya035 authored Sep 7, 2023
2 parents 178fa8c + 224accf commit 1e06a27
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 153 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote state] Integrate remote cluster state in publish/commit flow ([#9665](https://github.com/opensearch-project/OpenSearch/pull/9665))
- [Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level ([#9709](https://github.com/opensearch-project/OpenSearch/pull/9709))
- [Remote Store] Changes to introduce repository registration during bootstrap via node attributes. ([#9105](https://github.com/opensearch-project/OpenSearch/pull/9105))
- [Remote state] Auto restore index metadata from last known cluster state ([#9831](https://github.com/opensearch-project/OpenSearch/pull/9831))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private void assertReindexingWithMultipleCodecs(String destCodec, String destCod
}

private void useCodec(String index, String codec) throws ExecutionException, InterruptedException {
assertAcked(client().admin().indices().prepareClose(index));
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));

assertAcked(
client().admin()
Expand All @@ -144,7 +144,7 @@ private void useCodec(String index, String codec) throws ExecutionException, Int
.get()
);

assertAcked(client().admin().indices().prepareOpen(index));
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
}

private void flushAndRefreshIndex(String index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
);
ensureGreen(index);

assertAcked(client().admin().indices().prepareClose(index));
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));

Throwable executionException = expectThrows(
ExecutionException.class,
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx
.get()
);

assertAcked(client().admin().indices().prepareOpen(index));
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
ensureGreen(index);
}

Expand All @@ -148,7 +148,7 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
);
ensureGreen(index);

assertAcked(client().admin().indices().prepareClose(index));
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));

Throwable executionException = expectThrows(
ExecutionException.class,
Expand Down Expand Up @@ -181,7 +181,7 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx
.get()
);

assertAcked(client().admin().indices().prepareOpen(index));
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
ensureGreen(index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(CustomCodecPlugin.class);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9872")
public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException {

Map<String, String> codecMap = Map.of(
Expand Down Expand Up @@ -119,7 +118,7 @@ private void forceMergeMultipleCodecs(String finalCodec, String finalCodecMode,
}

private void useCodec(String index, String codec) throws ExecutionException, InterruptedException {
assertAcked(client().admin().indices().prepareClose(index));
assertAcked(client().admin().indices().prepareClose(index).setWaitForActiveShards(1));

assertAcked(
client().admin()
Expand All @@ -128,7 +127,7 @@ private void useCodec(String index, String codec) throws ExecutionException, Int
.get()
);

assertAcked(client().admin().indices().prepareOpen(index));
assertAcked(client().admin().indices().prepareOpen(index).setWaitForActiveShards(1));
}

private void ingestDocs(String index) throws InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ private void printClusterRouting() throws IOException, ParseException {
* This test verifies that segment replication does not break when primary shards are on lower OS version. It does this
* by verifying replica shards contains same number of documents as primary's.
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/9685")
public void testIndexingWithPrimaryOnBwcNodes() throws Exception {
if (UPGRADE_FROM_VERSION.before(Version.V_2_4_0)) {
logger.info("--> Skip test for version {} where segment replication feature is not available", UPGRADE_FROM_VERSION);
Expand Down
42 changes: 26 additions & 16 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
Expand Down Expand Up @@ -126,7 +128,8 @@ public void start(
MetadataUpgrader metadataUpgrader,
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService,
PersistedStateRegistry persistedStateRegistry
PersistedStateRegistry persistedStateRegistry,
RemoteStoreRestoreService remoteStoreRestoreService
) {
assert this.persistedStateRegistry == null : "Persisted state registry should only be set once";
this.persistedStateRegistry = persistedStateRegistry;
Expand Down Expand Up @@ -154,7 +157,7 @@ public void start(
PersistedState remotePersistedState = null;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(
ClusterState clusterState = prepareInitialClusterState(
transportService,
clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
Expand All @@ -164,10 +167,28 @@ public void start(
);

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (isRemoteStoreClusterStateEnabled(settings)) {
// If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote
// If there is no valid state on remote, continue with initial empty state
// If there is a valid state, then restore index metadata using this state
if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) {
String lastKnownClusterUUID = remoteClusterStateService.getLastKnownUUIDFromRemote(
clusterState.getClusterName().value()
);
if (!ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID)) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
clusterState,
lastKnownClusterUUID,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(
settings,
Expand Down Expand Up @@ -651,12 +672,6 @@ public void setCurrentTerm(long currentTerm) {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.info("Cluster is not yet ready to publish state to remote store");
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
manifest = remoteClusterStateService.writeFullMetadata(clusterState);
Expand Down Expand Up @@ -706,13 +721,8 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
assert lastAcceptedState != null : "Last accepted state is not present";
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedManifest
Expand Down
Loading

0 comments on commit 1e06a27

Please sign in to comment.