Skip to content

Commit

Permalink
[Remote Store] Integrate remote segment store in peer recovery flow (#…
Browse files Browse the repository at this point in the history
…6664)

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored Mar 25, 2023
1 parent 9febe10 commit 07565ad
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- [Remote Store] Integrate remote segment store in peer recovery flow ([#6664](https://github.com/opensearch-project/OpenSearch/pull/6664))
- [Segment Replication] Add new cluster setting to set replication strategy by default for all indices in cluster. ([#6791](https://github.com/opensearch-project/OpenSearch/pull/6791))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.UUIDs;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.IndexModule;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
Expand All @@ -30,6 +33,8 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
Expand Down Expand Up @@ -207,4 +212,93 @@ public void testRemoteTranslogRestoreWithRefreshedData() throws IOException {
public void testRemoteTranslogRestoreWithCommittedData() throws IOException {
testRestoreFlow(true, randomIntBetween(2, 5), true);
}

private void testPeerRecovery(boolean remoteTranslog, int numberOfIterations, boolean invokeFlush) throws Exception {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(0));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(0));
}
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

Map<String, Long> indexStats = indexData(numberOfIterations, invokeFlush);

client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
.get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

refresh(INDEX_NAME);
String replicaNodeName = replicaNodeName(INDEX_NAME);
assertBusy(
() -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS)),
30,
TimeUnit.SECONDS
);

RecoveryResponse recoveryResponse = client(replicaNodeName).admin().indices().prepareRecoveries().get();

Optional<RecoveryState> recoverySource = recoveryResponse.shardRecoveryStates()
.get(INDEX_NAME)
.stream()
.filter(rs -> rs.getRecoverySource().getType() == RecoverySource.Type.PEER)
.findFirst();
assertFalse(recoverySource.isEmpty());
if (numberOfIterations == 1 && invokeFlush) {
// segments_N file is copied to new replica
assertEquals(1, recoverySource.get().getIndex().recoveredFileCount());
} else {
assertEquals(0, recoverySource.get().getIndex().recoveredFileCount());
}

IndexResponse response = indexSingleDoc();
assertEquals(indexStats.get(MAX_SEQ_NO_TOTAL) + 1, response.getSeqNo());
refresh(INDEX_NAME);
assertBusy(
() -> assertHitCount(client(replicaNodeName).prepareSearch(INDEX_NAME).setSize(0).get(), indexStats.get(TOTAL_OPERATIONS) + 1),
30,
TimeUnit.SECONDS
);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(false, 1, true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogFlush() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), true);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(false, 1, false);
}

public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exception {
testPeerRecovery(false, randomIntBetween(2, 5), false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception {
testPeerRecovery(true, 1, true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), true);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception {
testPeerRecovery(true, 1, false);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193")
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), false);
}
}
15 changes: 13 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4372,11 +4372,22 @@ public void close() throws IOException {
}

/**
* Downloads segments from remote segment store.
* Downloads segments from remote segment store. This method will download segments till
* last refresh checkpoint.
* @param overrideLocal flag to override local segment files with those in remote store
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException {
syncSegmentsFromRemoteSegmentStore(overrideLocal, true);
}

/**
* Downloads segments from remote segment store.
* @param overrideLocal flag to override local segment files with those in remote store
* @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise
* @throws IOException if exception occurs while reading segments from remote store
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
Expand Down Expand Up @@ -4415,7 +4426,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
skippedSegments.add(file);
}
}
if (segmentInfosSnapshotFilename != null) {
if (refreshLevelSegmentSync && segmentInfosSnapshotFilename != null) {
try (
ChecksumIndexInput indexInput = new BufferedChecksumIndexInput(
storeDirectory.openInput(segmentInfosSnapshotFilename, IOContext.DEFAULT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,13 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
indexShard.prepareForIndexRecovery();
final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled();
if (hasRemoteSegmentStore) {
indexShard.syncSegmentsFromRemoteSegmentStore(false, false);
}
final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot();
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog) == false;
final boolean verifyTranslog = (hasRemoteTranslog || hasNoTranslog || hasRemoteSegmentStore) == false;
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!hasRemoteTranslog);
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2449,4 +2449,11 @@ protected String primaryNodeName(String indexName) {
String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).primaryShard().currentNodeId();
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}

protected String replicaNodeName(String indexName) {
ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
String nodeId = clusterState.getRoutingTable().index(indexName).shard(0).replicaShards().get(0).currentNodeId();
return clusterState.getRoutingNodes().node(nodeId).node().getName();
}

}

0 comments on commit 07565ad

Please sign in to comment.