Skip to content

Commit

Permalink
Segregate shard level tests for node to node and remote store segment…
Browse files Browse the repository at this point in the history
… replication

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Aug 1, 2023
1 parent eb29746 commit 60126e5
Show file tree
Hide file tree
Showing 8 changed files with 725 additions and 570 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -24,7 +23,6 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -134,24 +132,6 @@ protected void waitForSearchableDocs(long docCount, String... nodes) throws Exce
waitForSearchableDocs(docCount, Arrays.stream(nodes).collect(Collectors.toList()));
}

protected void waitForSegmentReplication(String node) throws Exception {
assertBusy(() -> {
SegmentReplicationStatsResponse segmentReplicationStatsResponse = client(node).admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.setDetailed(true)
.execute()
.actionGet();
final SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats()
.get(INDEX_NAME)
.get(0);
assertEquals(
perGroupStats.getReplicaStats().stream().findFirst().get().getCurrentReplicationState().getStage(),
SegmentReplicationState.Stage.DONE
);
}, 1, TimeUnit.MINUTES);
}

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4620,7 +4620,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException {
*/
public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
logger.trace("Downloading segments from remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
Expand Down Expand Up @@ -4710,7 +4710,7 @@ public void syncSegmentsFromGivenRemoteSegmentStore(
long primaryTerm,
long commitGeneration
) throws IOException {
logger.info("Downloading segments from given remote segment store");
logger.trace("Downloading segments from given remote segment store");
RemoteSegmentStoreDirectory remoteDirectory = null;
if (remoteStore != null) {
remoteDirectory = getRemoteDirectory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public RemoteSegmentMetadata readLatestMetadataFile() throws IOException {
return remoteSegmentMetadata;
}

private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws IOException {
private RemoteSegmentMetadata readMetadataFile(String metadataFilename) throws IOException {S
try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) {
byte[] metadataBytes = new byte[(int) indexInput.length()];
indexInput.readBytes(metadataBytes, 0, (int) indexInput.length());
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,168 @@

package org.opensearch.index.shard;

import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.SegmentReplicationSourceFactory;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;

public class SegmentReplicationWithRemoteIndexShardTests extends OpenSearchIndexLevelReplicationTestCase {
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;

public class SegmentReplicationWithRemoteIndexShardTests extends SegmentReplicationIndexShardTests {

private static final String REPOSITORY_NAME = "temp-fs";
private static final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, "temp-fs")
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "temp-fs")
.put(IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME)
.build();
TransportService transportService;
IndicesService indicesService;
RecoverySettings recoverySettings;
SegmentReplicationSourceFactory sourceFactory;

@Before
public void setup() {
recoverySettings = new RecoverySettings(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
transportService = mock(TransportService.class);
indicesService = mock(IndicesService.class);
sourceFactory = new SegmentReplicationSourceFactory(transportService, recoverySettings, clusterService);
// Todo: Remove feature flag once remote store integration with segrep goes GA
FeatureFlags.initializeFeatureFlags(
Settings.builder().put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL_SETTING.getKey(), "true").build()
);
}

protected Settings getIndexSettings() {
return settings;
}

protected ReplicationGroup getReplicationGroup(int numberOfReplicas) throws IOException {
return createGroup(numberOfReplicas, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir());
}

protected SegmentReplicationTargetService prepareForReplication(
IndexShard primaryShard,
IndexShard target,
TransportService transportService,
IndicesService indicesService,
ClusterService clusterService,
Consumer<IndexShard> postGetFilesRunnable
) {
final SegmentReplicationTargetService targetService = new SegmentReplicationTargetService(
threadPool,
new RecoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
transportService,
sourceFactory,
indicesService,
clusterService
);
return targetService;
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshRefresh() throws Exception {
testNRTReplicaWithRemoteStorePromotedAsPrimary(false, false);
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimaryRefreshCommit() throws Exception {
testNRTReplicaWithRemoteStorePromotedAsPrimary(false, true);
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitRefresh() throws Exception {
testNRTReplicaWithRemoteStorePromotedAsPrimary(true, false);
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimaryCommitCommit() throws Exception {
testNRTReplicaWithRemoteStorePromotedAsPrimary(true, true);
}

public void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlushFirst, boolean performFlushSecond) throws Exception {
try (
ReplicationGroup shards = createGroup(1, getIndexSettings(), indexMapping, new NRTReplicationEngineFactory(), createTempDir())
) {
shards.startAll();
IndexShard oldPrimary = shards.getPrimary();
final IndexShard nextPrimary = shards.getReplicas().get(0);

// 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point.
final int numDocs = shards.indexDocs(randomInt(10));

// refresh but do not copy the segments over.
if (performFlushFirst) {
flushShard(oldPrimary, true);
} else {
oldPrimary.refresh("Test");
}
// replicateSegments(primary, shards.getReplicas());

// at this point both shards should have numDocs persisted and searchable.
assertDocCounts(oldPrimary, numDocs, numDocs);
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, numDocs, 0);
}

// 2. Create ops that are in the replica's xlog, not in the index.
// index some more into both but don't replicate. replica will have only numDocs searchable, but should have totalDocs
// persisted.
final int additonalDocs = shards.indexDocs(randomInt(10));
final int totalDocs = numDocs + additonalDocs;

if (performFlushSecond) {
flushShard(oldPrimary, true);
} else {
oldPrimary.refresh("Test");
}
assertDocCounts(oldPrimary, totalDocs, totalDocs);
for (IndexShard shard : shards.getReplicas()) {
assertDocCounts(shard, totalDocs, 0);
}
assertTrue(nextPrimary.translogStats().estimatedNumberOfOperations() >= additonalDocs);
assertTrue(nextPrimary.translogStats().getUncommittedOperations() >= additonalDocs);

// promote the replica
shards.promoteReplicaToPrimary(nextPrimary).get();

// close oldPrimary.
oldPrimary.close("demoted", false, false);
oldPrimary.store().close();

assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass());
assertDocCounts(nextPrimary, totalDocs, totalDocs);

// As we are downloading segments from remote segment store on failover, there should not be
// any operations replayed from translog
assertEquals(0, nextPrimary.translogStats().estimatedNumberOfOperations());

// refresh and push segments to our other replica.
nextPrimary.refresh("test");

public void testReplicaSyncingFromRemoteStore() throws IOException {
ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir());
final IndexShard primaryShard = shards.getPrimary();
final IndexShard replicaShard = shards.getReplicas().get(0);
shards.startPrimary();
shards.startAll();
indexDoc(primaryShard, "_doc", "1");
indexDoc(primaryShard, "_doc", "2");
primaryShard.refresh("test");
assertDocs(primaryShard, "1", "2");
flushShard(primaryShard);

replicaShard.syncSegmentsFromRemoteSegmentStore(true, true);
assertDocs(replicaShard, "1", "2");
closeShards(primaryShard, replicaShard);
for (IndexShard shard : shards) {
assertConsistentHistoryBetweenTranslogAndLucene(shard);
}
final List<DocIdSeqNoAndSource> docsAfterRecovery = getDocIdAndSeqNos(shards.getPrimary());
for (IndexShard shard : shards.getReplicas()) {
assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterRecovery));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ protected ReplicationGroup createGroup(int replicas, Settings settings, EngineFa

protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory)
throws IOException {
return createGroup(replicas, settings, mappings, engineFactory, null);
Path remotePath = null;
if (settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED) != null
&& settings.get(IndexMetadata.SETTING_REMOTE_STORE_ENABLED).equals("true")) {
remotePath = createTempDir();
}
return createGroup(replicas, settings, mappings, engineFactory, remotePath);
}

protected ReplicationGroup createGroup(int replicas, Settings settings, String mappings, EngineFactory engineFactory, Path remotePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1328,7 +1328,9 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) {

/**
* Segment Replication specific test method - Creates a {@link SegmentReplicationTargetService} to perform replications that has
* been configured to return the given primaryShard's current segments.
* been configured to return the given primaryShard's current segments. In order to do so, it mimics the replication
* source (to avoid transport calls) and simply copies over the segment files from primary store to replica's as part of
* get_files calls.
*
* @param primaryShard {@link IndexShard} - The target replica shard in segment replication.
* @param target {@link IndexShard} - The source primary shard in segment replication.
Expand All @@ -1339,7 +1341,7 @@ public static Engine.Warmer createTestWarmer(IndexSettings indexSettings) {
* which are desired right after files are copied. e.g. To work with temp files
* @return Returns SegmentReplicationTargetService
*/
public final SegmentReplicationTargetService prepareForReplication(
protected SegmentReplicationTargetService prepareForReplication(
IndexShard primaryShard,
IndexShard target,
TransportService transportService,
Expand All @@ -1362,6 +1364,7 @@ public final SegmentReplicationTargetService prepareForReplication(
postGetFilesRunnable
);
when(sourceFactory.get(any())).thenReturn(replicationSource);
// This is needed for force segment sync call. Remote store uses a different recovery mechanism
when(indicesService.getShardOrNull(any())).thenReturn(target);
return targetService;
}
Expand Down Expand Up @@ -1502,9 +1505,11 @@ public void getSegmentFiles(
* @param replicaShards - Replicas that will be updated.
* @return {@link List} List of target components orchestrating replication.
*/
public final List<SegmentReplicationTarget> replicateSegments(IndexShard primaryShard, List<IndexShard> replicaShards)
protected final List<SegmentReplicationTarget> replicateSegments(IndexShard primaryShard, List<IndexShard> replicaShards)
throws IOException, InterruptedException {
// Latch to block test execution until replica catches up
final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size());
// Get primary metadata to verify with replica's, used to ensure replica catches up
Map<String, StoreFileMetadata> primaryMetadata;
try (final GatedCloseable<SegmentInfos> segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) {
final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get();
Expand Down

0 comments on commit 60126e5

Please sign in to comment.