Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix flaky SR test testStartReplicaAfterPrimaryIndexesDocs.
Browse files Browse the repository at this point in the history
This test was failing because we are validating post recovery if a shard is able to perform segrep while also performing validation if a passed in checkopint.  In the post recovery test this checkpoint is always empty, yet the shard will be ahead of this checkpoint after docs are indexed.  This change differentiates shard validation from checkpoint validation.

Signed-off-by: Marc Handalian <[email protected]>
mch2 committed Jan 6, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent b3e25bb commit 0d0a5da
Showing 5 changed files with 36 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Reject bulk requests with invalid actions ([#5299](https://github.com/opensearch-project/OpenSearch/issues/5299))
- Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460))
- Increasing timeout of testQuorumRecovery to 90 seconds from 30 ([#5651](https://github.com/opensearch-project/OpenSearch/pull/5651))
- Fixed flaky test SegmentReplicationIT.testStartReplicaAfterPrimaryIndexesDocs ([#5722](https://github.com/opensearch-project/OpenSearch/pull/5722))

### Security

Original file line number Diff line number Diff line change
@@ -502,7 +502,6 @@ public void testCancellation() throws Exception {
assertDocCounts(docCount, primaryNode);
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
final String primaryNode = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
34 changes: 27 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
@@ -73,6 +73,7 @@
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Booleans;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.CheckedFunction;
@@ -1438,15 +1439,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
}
}


/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
* Checks if this shard is able to perform segment replication.
* @return - True if the shard is able to perform segment replication.
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (state().equals(IndexShardState.STARTED) == false) {
logger.trace(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started {}", state()));
public boolean isSegmentReplicationAllowed() {
if (indexSettings.isSegRepEnabled() == false) {
return false;
}
if (getReplicationTracker().isPrimaryMode()) {
@@ -1457,6 +1456,27 @@ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckp
logger.warn("Ignoring new replication checkpoint - primary shard cannot receive any checkpoints.");
return false;
}
if (state().equals(IndexShardState.STARTED) == false && (state() == IndexShardState.POST_RECOVERY && shardRouting.state() == ShardRoutingState.INITIALIZING) == false) {
logger.warn(() -> new ParameterizedMessage("Ignoring new replication checkpoint - shard is not started or recovering {} {}", state(), shardRouting.state()));
return false;
}
if (getReplicationEngine().isEmpty()) {
logger.warn(() -> new ParameterizedMessage("Ignoring checkpoint, attempting to perform segrep with wrong engine type {}", getEngine().getClass()));
return false;
}
return true;
}

/**
* Checks if checkpoint should be processed
*
* @param requestCheckpoint received checkpoint that is checked for processing
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
if (isSegmentReplicationAllowed() == false) {
return false;
}
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.trace(
Original file line number Diff line number Diff line change
@@ -47,7 +47,6 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RoutingNode;
@@ -811,11 +810,7 @@ private void forceSegmentReplication(
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null
&& indexShard.indexSettings().isSegRepEnabled()
&& shardRouting.primary() == false
&& shardRouting.state() == ShardRoutingState.INITIALIZING
&& indexShard.state() == IndexShardState.POST_RECOVERY) {
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.DocIdSeqNoAndSource;
import org.opensearch.index.engine.InternalEngine;
import org.opensearch.index.engine.InternalEngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.mapper.MapperService;
@@ -90,6 +91,12 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException {
closeShards(indexShard);
}

public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException {
final IndexShard indexShard = newStartedShard(false, settings, new InternalEngineFactory());
assertFalse(indexShard.isSegmentReplicationAllowed());
closeShards(indexShard);
}

public void testSegmentReplication_Index_Update_Delete() throws Exception {
String mappings = "{ \"" + MapperService.SINGLE_MAPPING_NAME + "\": { \"properties\": { \"foo\": { \"type\": \"keyword\"} }}}";
try (ReplicationGroup shards = createGroup(2, settings, mappings, new NRTReplicationEngineFactory())) {

0 comments on commit 0d0a5da

Please sign in to comment.