Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] For replica recovery, force segment replication sync from peer recovery source #5746

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[Segment Replication] For replica recovery, force segment replication…
… sync from source

Signed-off-by: Suraj Singh <[email protected]>
dreamer-89 committed Feb 1, 2023

Verified

This commit was signed with the committer’s verified signature.
Aaron1011 Aaron Hill
commit b46d307f69874ca1b81a9bc00917f7ad080202a8
Original file line number Diff line number Diff line change
@@ -140,8 +140,58 @@ public void testCancelPrimaryAllocation() throws Exception {
}

/**
* This test verfies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
* <p>
* This test verifies that adding a new node which results in peer recovery as replica; also bring replica's
* replication checkpoint upto the primary's by performing a round of segment replication.
*/
public void testNewlyAddedReplicaIsUpdated() {
internalCluster().startNode(featureFlagSettings());
prepareCreate(
INDEX_NAME,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
).get();
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
logger.info("--> flush so we have some segment files on disk");
flush(INDEX_NAME);
logger.info("--> index more docs so we have something in the translog");
for (int i = 10; i < 20; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}
refresh(INDEX_NAME);
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode(featureFlagSettings());
ensureGreen(INDEX_NAME);
// Update replica count settings to 1 so that peer recovery triggers and recover replicaNode
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// Verify that cluster state is not green and replica shard failed during a round of segment replication is not added to the cluster
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("2")
.setWaitForGreenStatus()
.setTimeout(TimeValue.timeValueSeconds(2))
.execute()
.actionGet();
assertFalse(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, replicaNode);
assertTrue(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);
}

/**
* This test verifies that replica shard is not added to the cluster when doing a round of segment replication fails during peer recovery.
*
* TODO: Ignoring this test as its flaky and needs separate fix
*/
@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
@@ -168,7 +218,7 @@ public void testAddNewReplicaFailure() throws Exception {
}
refresh(INDEX_NAME);
logger.info("--> verifying count");
assertThat(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
assertEquals(client().prepareSearch(INDEX_NAME).setSize(0).execute().actionGet().getHits().getTotalHits().value, 20L);

logger.info("--> start empty node to add replica shard");
final String replicaNode = internalCluster().startNode();
Original file line number Diff line number Diff line change
@@ -37,7 +37,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ResourceAlreadyExistsException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
@@ -83,11 +82,8 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.replication.SegmentReplicationSourceService;
import org.opensearch.indices.replication.SegmentReplicationState;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationFailedException;
import org.opensearch.indices.replication.common.ReplicationState;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.search.SearchService;
@@ -781,78 +777,7 @@ public synchronized void handleRecoveryFailure(ShardRouting shardRouting, boolea

public void handleRecoveryDone(ReplicationState state, ShardRouting shardRouting, long primaryTerm) {
RecoveryState recoveryState = (RecoveryState) state;
AllocatedIndex<? extends Shard> indexService = indicesService.indexService(shardRouting.shardId().getIndex());
StepListener<Void> forceSegRepListener = new StepListener<>();
// For Segment Replication enabled indices, we want replica shards to start a replication event to fetch latest segments before
// it is marked as Started.
if (indexService.getIndexSettings().isSegRepEnabled()) {
forceSegmentReplication(indexService, shardRouting, forceSegRepListener);
} else {
forceSegRepListener.onResponse(null);
}
forceSegRepListener.whenComplete(
v -> shardStateAction.shardStarted(
shardRouting,
primaryTerm,
"after " + recoveryState.getRecoverySource(),
SHARD_STATE_ACTION_LISTENER
),
e -> handleRecoveryFailure(shardRouting, true, e)
);
}

/**
* Forces a round of Segment Replication with empty checkpoint, so that replicas could fetch latest segment files from primary.
*/
private void forceSegmentReplication(
AllocatedIndex<? extends Shard> indexService,
ShardRouting shardRouting,
StepListener<Void> forceSegRepListener
) {
IndexShard indexShard = (IndexShard) indexService.getShardOrNull(shardRouting.id());
if (indexShard != null && indexShard.isSegmentReplicationAllowed()) {
segmentReplicationTargetService.startReplication(
ReplicationCheckpoint.empty(shardRouting.shardId()),
indexShard,
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
forceSegRepListener.onResponse(null);
}

@Override
public void onReplicationFailure(
SegmentReplicationState state,
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
state.getReplicationId(),
state.getTimingData()
)
);
if (sendShardFailure == true) {
logger.error("replication failure", e);
indexShard.failShard("replication failure", e);
}
forceSegRepListener.onFailure(e);
}
}
);
} else {
forceSegRepListener.onResponse(null);
}
shardStateAction.shardStarted(shardRouting, primaryTerm, "after " + recoveryState.getRecoverySource(), SHARD_STATE_ACTION_LISTENER);
}

private void failAndRemoveShard(
Original file line number Diff line number Diff line change
@@ -832,6 +832,11 @@ void finalizeRecovery(long targetLocalCheckpoint, long trimAboveSeqNo, ActionLis
* if the recovery process fails after disabling primary mode on the source shard, both relocation source and
* target are failed (see {@link IndexShard#updateRoutingEntry}).
*/
} else {
// Force round of segment replication to update its checkpoint to primary's
if (shard.indexSettings().isSegRepEnabled()) {
recoveryTarget.forceSegmentFileSync();
}
}
stopWatch.stop();
logger.info("finalizing recovery took [{}]", stopWatch.totalTime());
Original file line number Diff line number Diff line change
@@ -347,7 +347,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
}
}

class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
private class ForceSyncTransportRequestHandler implements TransportRequestHandler<ForceSyncRequest> {
@Override
public void messageReceived(final ForceSyncRequest request, TransportChannel channel, Task task) throws Exception {
assert indicesService != null;
@@ -367,7 +367,10 @@ public void onReplicationDone(SegmentReplicationState state) {
)
);
try {
indexShard.resetToWriteableEngine();
// Promote engine type for primary target
if (indexShard.recoveryState().getPrimary() == true) {
indexShard.resetToWriteableEngine();
}
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (InterruptedException | TimeoutException | IOException e) {
throw new RuntimeException(e);
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardTestCase;
import org.opensearch.index.store.StoreFileMetadata;
@@ -51,7 +52,7 @@ public void setUp() throws Exception {
super.setUp();
final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build();
primary = newStartedShard(true, settings);
replica = newShard(primary.shardId(), false);
replica = newShard(false, settings, new NRTReplicationEngineFactory());
recoverReplica(replica, primary, true);
replicaDiscoveryNode = replica.recoveryState().getTargetNode();
}
Original file line number Diff line number Diff line change
@@ -13,7 +13,6 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.CancellableThreads;
import org.opensearch.index.engine.NRTReplicationEngineFactory;
@@ -62,10 +61,9 @@ public void setUp() throws Exception {
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put("node.name", SegmentReplicationTargetServiceTests.class.getSimpleName())
.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
primaryShard = newStartedShard(true, settings);
replicaShard = newShard(false, settings, new NRTReplicationEngineFactory());
recoverReplica(replicaShard, primaryShard, true);
recoverReplica(replicaShard, primaryShard, true, getReplicationFunc(replicaShard));
checkpoint = new ReplicationCheckpoint(replicaShard.shardId(), 0L, 0L, 0L, 0L);
SegmentReplicationSourceFactory replicationSourceFactory = mock(SegmentReplicationSourceFactory.class);
replicationSource = mock(SegmentReplicationSource.class);
Original file line number Diff line number Diff line change
@@ -546,6 +546,7 @@ public void recoverReplica(
inSyncIds,
routingTable,
(a) -> null
// getReplicationFunc(replica)
);
OpenSearchIndexLevelReplicationTestCase.this.startReplicaAfterRecovery(replica, primary, inSyncIds, routingTable);
computeReplicationTargets();
Original file line number Diff line number Diff line change
@@ -848,6 +848,7 @@ protected DiscoveryNode getFakeDiscoNode(String id) {

protected void recoverReplica(IndexShard replica, IndexShard primary, boolean startReplica) throws IOException {
recoverReplica(replica, primary, startReplica, (a) -> null);
// recoverReplica(replica, primary, startReplica, getReplicationFunc(replica));
}

/** recovers a replica from the given primary **/
@@ -877,6 +878,19 @@ protected void recoverReplica(
recoverReplica(replica, primary, targetSupplier, markAsRecovering, markAsStarted, (a) -> null);
}

public BiFunction<List<IndexShard>, ActionListener<Void>, List<SegmentReplicationTarget>> getReplicationFunc(final IndexShard target) {
return target.indexSettings().isSegRepEnabled() ? (shardList, listener) -> {
try {
assert shardList.size() >= 2;
final IndexShard primary = shardList.get(0);
return replicateSegments(primary, shardList.subList(1, shardList.size()), listener);
} catch (IOException | InterruptedException e) {
listener.onFailure(e);
throw new RuntimeException(e);
}
} : (a, b) -> null;
}

/** recovers a replica from the given primary **/
protected void recoverReplica(
final IndexShard replica,