Skip to content

Commit

Permalink
[Segment Replication] For replica recovery, force segment replication…
Browse files Browse the repository at this point in the history
… sync from source

Signed-off-by: Suraj Singh <[email protected]>
  • Loading branch information
dreamer-89 committed Jan 7, 2023
1 parent 85f4149 commit 0d133e6
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;
import static org.opensearch.index.query.QueryBuilders.matchQuery;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -219,7 +218,57 @@ public void testCancelPrimaryAllocation() throws Exception {
}

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
public void testNewlyAddedReplicaIsUpdated() {
internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have some segment files on disk");
flush(INDEX_NAME);
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replicaNode
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertFalse(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);
}

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
Expand All @@ -246,7 +295,7 @@ public void testAddNewReplicaFailure() throws Exception {
}
refresh(INDEX_NAME);
logger.info("--> verifying count");
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode(featureFlagSettings());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
Expand Down Expand Up @@ -83,11 +82,8 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
Expand Down Expand Up @@ -781,78 +777,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea

public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState recoveryState = (RecoveryState) state;
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
StepListener<Void> forceSegRepListener = new StepListener<>();
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before
// it is marked as Started.
if (indexService.getIndexSettings().isSegRepEnabled()) {
forceSegmentReplication(indexService, shardRouting, forceSegRepListener);
} else {
forceSegRepListener.onResponse(null);
}
forceSegRepListener.whenComplete(
v -> shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + recoveryState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
),
e -> handleRecoveryFailure(shardRouting, true, e)
);
}

/**
* Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary.
*/
private void forceSegmentReplication(
AllocatedIndex<? extends Shard> indexService,
ShardRouting shardRouting,
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
forceSegRepListener.onResponse(null);
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
forceSegRepListener.onFailure(e);
}
}
);
} else {
forceSegRepListener.onResponse(null);
}
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

private void failAndRemoveShard(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,12 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
} else {
handoffListener.onResponse(null);
// Force round of segment replication to update its checkpoint to primary's
if (shard.indexSettings().isSegRepEnabled()) {
recoveryTarget.forceSegmentFileSync(handoffListener);
} else {
handoffListener.onResponse(null);
}
}
handoffListener.whenComplete(res -> {
stopWatch.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
}
}

class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
private class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
@Override
public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception {
assert indicesService != null;
Expand All @@ -359,7 +359,10 @@ public void onReplicationDone(SegmentReplicationState state) {
)
);
try {
indexShard.resetToWriteableEngine();
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 0d133e6

Please sign in to comment.