Skip to content

Commit

Permalink
Fix flaky Segment Replication test testStartReplicaAfterPrimaryIndexe…
Browse files Browse the repository at this point in the history
…sDocs. (#5722)

* Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs.

This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint.  In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed.  This change differentiates shard validation from checkpoint validation.

Signed-off-by: Marc Handalian <[email protected]>

Fix spotless.

Signed-off-by: Marc Handalian <[email protected]>

Fix testIsSegmentReplicationAllowed_WrongEngineType.

Signed-off-by: Marc Handalian <[email protected]>

Update warn logs in isSegmentReplicationAllowed.

Signed-off-by: Marc Handalian <[email protected]>

* PR feedback.

Signed-off-by: Marc Handalian <[email protected]>

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Jan 7, 2023
1 parent 4a62341 commit 85f4149
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,6 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
Expand Down
50 changes: 41 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
Expand Down Expand Up @@ -1454,22 +1455,53 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
* Checks if this target shard should start a round of segment replication.
* @return - True if the shard is able to perform segment replication.
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
public boolean isSegmentReplicationAllowed() {
if (indexSettings.isSegRepEnabled() == false) {
logger.warn("Attempting to perform segment replication when it is not enabled on the index");
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
logger.warn("Ignoring new replication checkpoint - shard is in primaryMode and cannot receive any checkpoints.");
logger.warn("Shard is in primary mode and cannot perform segment replication as a replica.");
return false;
}
if (this.routingEntry().primary()) {
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
logger.warn("Shard is marked as primary and cannot perform segment replication as a replica");
return false;
}
if (state().equals(IndexShardState.STARTED) == false
&& (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(
() -> new ParameterizedMessage(
"Shard is not started or recovering {} {} and cannot perform segment replication as a replica",
state(),
shardRouting.state()
)
);
return false;
}
if (getReplicationEngine().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Shard does not have the correct engine type to perform segment replication {}.",
getEngine().getClass()
)
);
return false;
}
return true;
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
Expand Down Expand Up @@ -811,11 +810,7 @@ private void forceSegmentReplication(
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null
&& indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
Expand Down Expand Up @@ -97,6 +98,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
closeShards(indexShard);
}

public void testSegmentReplication_Index_Update_Delete() throws Exception {
String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";
try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {
Expand Down

0 comments on commit 85f4149

Please sign in to comment.