Skip to content


[Segment Replication] Update force segment replication round to be sy…
Browse files Browse the repository at this point in the history
…nchronous (opensearch-project#5898)

* Update force segment replication sync to be synchronous

Signed-off-by: Suraj Singh <[email protected]>

* Add logs and fix spotlessApply

Signed-off-by: Suraj Singh <[email protected]>

    pick da8cb72ab4f Update unit test post rebase

* Update unit test post rebase

Signed-off-by: Suraj Singh <[email protected]>

* Update integration tests

Signed-off-by: Suraj Singh <[email protected]>

* Mute testPrimaryRelocationWithSegRepFailure

Signed-off-by: Suraj Singh <[email protected]>

* Remove extra closing bracket after main merge

Signed-off-by: Suraj Singh <[email protected]>

* PR feedback

Signed-off-by: Suraj Singh <[email protected]>

* Spotless fix

Signed-off-by: Suraj Singh <[email protected]>


Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 authored and mch2 committed Mar 4, 2023
1 parent d81e876 commit 169f1fa
Show file tree
Hide file tree
Showing 15 changed files with 798 additions and 741 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
* SPDX-License-Identifier: Apache-2.0
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.

package org.opensearch.indices.replication;

import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.Nullable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.assertBusy;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {

protected static final String INDEX_NAME = "test-idx-1";
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return asList(MockTransportService.TestPlugin.class);

protected boolean addMockInternalEngine() {
return false;

public Settings indexSettings() {
return Settings.builder()
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)

protected ShardRouting getShardRoutingForNodeName(String nodeName) {
final ClusterState state = getClusterState();
for (IndexShardRoutingTable shardRoutingTable : state.routingTable().index(INDEX_NAME)) {
for (ShardRouting shardRouting : shardRoutingTable.activeShards()) {
final String nodeId = shardRouting.currentNodeId();
final DiscoveryNode discoveryNode = state.nodes().resolveNode(nodeId);
if (discoveryNode.getName().equals(nodeName)) {
return shardRouting;
return null;

protected void assertDocCounts(int expectedDocCount, String... nodeNames) {
for (String node : nodeNames) {
assertHitCount(client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedDocCount);

protected ClusterState getClusterState() {
return client(internalCluster().getClusterManagerName()).admin().cluster().prepareState().get().getState();

protected DiscoveryNode getNodeContainingPrimaryShard() {
final ClusterState state = getClusterState();
final ShardRouting primaryShard = state.routingTable().index(INDEX_NAME).shard(0).primaryShard();
return state.nodes().resolveNode(primaryShard.currentNodeId());

* Waits until all given nodes have at least the expected docCount.
* @param docCount - Expected Doc count.
* @param nodes - List of node names.
protected void waitForSearchableDocs(long docCount, List<String> nodes) throws Exception {
// wait until the replica has the latest segment generation.
assertBusy(() -> {
for (String node : nodes) {
final SearchResponse response = client(node).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get();
final long hits = response.getHits().getTotalHits().value;
if (hits < docCount) {
fail("Expected search hits on node: " + node + " to be at least " + docCount + " but was: " + hits);
}, 1, TimeUnit.MINUTES);

protected void waitForSearchableDocs(long docCount, String... nodes) throws Exception {

protected void verifyStoreContent() throws Exception {
assertBusy(() -> {
final ClusterState clusterState = getClusterState();
for (IndexRoutingTable indexRoutingTable : clusterState.routingTable()) {
for (IndexShardRoutingTable shardRoutingTable : indexRoutingTable) {
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final String indexName = primaryRouting.getIndexName();
final List<ShardRouting> replicaRouting = shardRoutingTable.replicaShards();
final IndexShard primaryShard = getIndexShard(clusterState, primaryRouting, indexName);
final Map<String, StoreFileMetadata> primarySegmentMetadata = primaryShard.getSegmentMetadataMap();
for (ShardRouting replica : replicaRouting) {
IndexShard replicaShard = getIndexShard(clusterState, replica, indexName);
final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(
if (recoveryDiff.missing.isEmpty() == false || recoveryDiff.different.isEmpty() == false) {
"Expected no missing or different segments between primary and replica but diff was missing: "
+ recoveryDiff.missing
+ " Different: "
+ recoveryDiff.different
+ " Primary Replication Checkpoint : "
+ primaryShard.getLatestReplicationCheckpoint()
+ " Replica Replication Checkpoint: "
+ replicaShard.getLatestReplicationCheckpoint()
// calls to readCommit will fail if a valid commit point and all its segments are not in the store.;
}, 1, TimeUnit.MINUTES);

private IndexShard getIndexShard(ClusterState state, ShardRouting routing, String indexName) {
return getIndexShard(state.nodes().get(routing.currentNodeId()).getName(), indexName);

protected IndexShard getIndexShard(String node, String indexName) {
final Index index = resolveIndex(indexName);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
IndexService indexService = indicesService.indexServiceSafe(index);
final Optional<Integer> shardId = indexService.shardIds().stream().findFirst();
return indexService.getShard(shardId.get());


0 comments on commit 169f1fa

Please sign in to comment.