Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Trigger a round of replication for replica shards during peer recovery when segment replication is enabled #5332

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.BeforeClass;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand;
import org.opensearch.common.Nullable;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.Index;
Expand Down Expand Up @@ -53,6 +55,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -194,6 +197,48 @@ public void testCancelPrimaryAllocation() throws Exception {
assertSegmentStats(REPLICA_COUNT);
}

/**
* This test adds a new replica shard to an existing cluster which already has few docs inserted before adding replica.
* We don't perform any refresh on index and assert new replica shard on doc hit count.
* This test makes sure that when a new replica is added to an existing cluster it gets all latest segments from primary even without a refresh.
*/
public void testAddNewReplica() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is very similar to testStartReplicaAfterPrimaryIndexesDocs, can we reuse that test? That test currently indexes a doc after the replica is recovered to force another round of replication, but you could assert the doc count is sync'd on line 412 after ensureGreen().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you right. Let me see if we can reuse it

logger.info("--> starting [node1] ...");
final String node_1 = internalCluster().startNode();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Usually I find it better to call nodes by their role. This makes it easier to understand when we perform any node specific actions (e.g. restart(primary), stop (replica) etc). Otherwise, we need to look back when node_i was created and its role.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. I will rename both nodes accordingly


logger.info("--> creating test index ...");
prepareCreate(INDEX_NAME, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the actual settings instead of strings - IndexMetadata.SETTING_NUMBER_OF_SHARDS

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sure


logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have an actual index");
Copy link
Member

@dreamer-89 dreamer-89 Nov 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean by actual index -> index/segment files on disk ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, let me change the terminology here. actual index might be confusing.

client().admin().indices().prepareFlush().execute().actionGet();
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

logger.info("--> verifying count");
client().admin().indices().prepareRefresh().execute().actionGet();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - you can call refresh(INDEX_NAME);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure will make that change

assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));

logger.info("--> start another node");
final String node_2 = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.execute()
.actionGet();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
waitForReplicaUpdate();
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource.Type;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.RecoverySource.Type;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.component.AbstractLifecycleComponent;
Expand Down Expand Up @@ -82,8 +83,10 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -143,6 +146,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
private final Consumer<ShardId> globalCheckpointSyncer;
private final RetentionLeaseSyncer retentionLeaseSyncer;

private final SegmentReplicationTargetService segmentReplicationTargetService;

private final SegmentReplicationCheckpointPublisher checkpointPublisher;

@Inject
Expand Down Expand Up @@ -217,6 +222,7 @@ public IndicesClusterStateService(
indexEventListeners.add(segmentReplicationTargetService);
indexEventListeners.add(segmentReplicationSourceService);
}
this.segmentReplicationTargetService = segmentReplicationTargetService;
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
this.indicesService = indicesService;
this.clusterService = clusterService;
Expand Down Expand Up @@ -774,7 +780,52 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea

public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState RecState = (RecoveryState) state;
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before it
// is marked as Started.
if (indexShard.indexSettings().isSegRepEnabled()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You will need a null check here given you are invoking getShardOrNull above.

Copy link
Member Author

@Rishikesh1159 Rishikesh1159 Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. Sure, I will add null check.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also read the setting from indexSettings before fetching a reference to the IndexShard.

indexService.getIndexSettings().isSegRepEnabled().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can add that

&& shardRouting.primary() == false
&& ShardRoutingState.RELOCATING != shardRouting.state()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this condition be ShardRoutingState.STARTED == shardRouting.state() ? Existing condition applies for UNASSIGNED and INITIALIED shards, is that correct ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think ShardRoutingState.RELOCATING != shardRouting.state() is an edge case check we are doing, so that relocating shard doesn't receive any checkpoints.

For ShardRoutingState.STARTED == shardRouting.state() this check will be false at this point, because we are performing a round of replication before marking shard as STARTED. So, shard routing will never be in STARTED state at this point.

Yes existing conditions works for INITIALIZED shard routing state. ShardRoutingState.INITIALIZED will be shard routing state at this point. Not sure if shard routing state will be UNASSIGNED, after peer recovery is completed usually shard routing will be in INITIALIZED state.

segmentReplicationTargetService.startReplication(indexShard, new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + RecState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
);
}

@Override
public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: These are logged at debug level on failShard call. May be we can remove it from here

indexShard.failShard("replication failure", e);
}
}
});
} else {
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + RecState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

}

private void failAndRemoveShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ public SegmentReplicationTarget startReplication(
return target;
}

public SegmentReplicationTarget startReplication(final IndexShard indexShard, final SegmentReplicationListener listener) {
return startReplication(ReplicationCheckpoint.empty(indexShard.shardId()), indexShard, listener);
}

// pkg-private for integration tests
void startReplication(final SegmentReplicationTarget target) {
final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout());
Expand Down