Skip to content

Commit

Permalink
[Segment Replication] Fix Flaky Test SegmentReplicationRelocationIT.t…
Browse files Browse the repository at this point in the history
…estRelocateWhileContinuouslyIndexingAndWaitingForRefresh (#6637)

* Trigger Refresh on NRT Engine.

Signed-off-by: Rishikesh1159 <[email protected]>

* Revert changes made to PublishCheckpointAction in #6366

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix failing unit test

Signed-off-by: Rishikesh1159 <[email protected]>

* Force flush on new elected primary after relocation.

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix failing unit test.

Signed-off-by: Rishikesh1159 <[email protected]>

* Remove unnecessary assertions

Signed-off-by: Rishikesh1159 <[email protected]>

* Adding tests.

Signed-off-by: Rishikesh1159 <[email protected]>

* Address comments

Signed-off-by: Rishikesh1159 <[email protected]>

* Fix indentation.

Signed-off-by: Rishikesh1159 <[email protected]>

---------

Signed-off-by: Rishikesh1159 <[email protected]>
  • Loading branch information
Rishikesh1159 authored Mar 15, 2023
1 parent 535942e commit 1e5d913
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.ClusterState;
Expand All @@ -33,6 +34,7 @@

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

/**
* This test class verifies primary shard relocation with segment replication as replication strategy.
Expand Down Expand Up @@ -494,4 +496,72 @@ public void testAddNewReplicaFailure() throws Exception {
assertTrue(clusterHealthResponse.isTimedOut());
ensureYellow(INDEX_NAME);
}

public void testFlushAfterRelocation() throws Exception {
// Starting two nodes with primary and replica shards respectively.
final String primaryNode = internalCluster().startNode();
prepareCreate(
INDEX_NAME,
Settings.builder()
// we want to control refreshes
.put("index.refresh_interval", -1)
).get();
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// Start another empty node for relocation
final String newPrimary = internalCluster().startNode();
ClusterHealthResponse clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes("3")
.execute()
.actionGet();
assertEquals(clusterHealthResponse.isTimedOut(), false);
ensureGreen(INDEX_NAME);

// Start indexing docs
final int initialDocCount = scaledRandomIntBetween(2000, 3000);
for (int i = 0; i < initialDocCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
}

// Verify segment replication event never happened on replica shard
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
assertFalse(segmentReplicationStatsResponse.hasSegmentReplicationStats());

// Relocate primary to new primary. When new primary starts it does perform a flush.
logger.info("--> relocate the shard from primary to newPrimary");
ActionFuture<ClusterRerouteResponse> relocationListener = client().admin()
.cluster()
.prepareReroute()
.add(new MoveAllocationCommand(INDEX_NAME, 0, primaryNode, newPrimary))
.execute();
clusterHealthResponse = client().admin()
.cluster()
.prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNoRelocatingShards(true)
.setTimeout(ACCEPTABLE_RELOCATION_TIME)
.execute()
.actionGet();
assertEquals(clusterHealthResponse.isTimedOut(), false);

// Verify if all docs are present in replica after relocation, if new relocated primary doesn't flush after relocation the below
// assert will fail.
assertBusy(
() -> {
assertHitCount(
client(replicaNode).prepareSearch(INDEX_NAME).setPreference("_only_local").setSize(0).get(),
initialDocCount
);
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,22 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
maybeRefresh(source);
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
return false;
try {
return readerManager.maybeRefresh();
} catch (IOException e) {
try {
failEngine("refresh failed source[" + source + "]", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new RefreshFailedEngineException(shardId, e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {
@Override
protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader referenceToRefresh) throws IOException {
Objects.requireNonNull(referenceToRefresh);
// checks if an actual refresh (change in segments) happened
if (unwrapStandardReader(referenceToRefresh).getSegmentInfos().version == currentInfos.version) {
return null;
}
final List<LeafReader> subs = new ArrayList<>();
final StandardDirectoryReader standardDirectoryReader = unwrapStandardReader(referenceToRefresh);
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void beforeRefresh() throws IOException {

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
if (didRefresh && shard.state() == IndexShardState.STARTED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard, shard.getLatestReplicationCheckpoint());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ public void updateShardState(
: "a primary relocation is completed by the cluster-managerr, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");

// Flush here after relocation of primary, so that replica get all changes from new primary rather than waiting for more
// docs to get indexed.
if (indexSettings.isSegRepEnabled()) {
flush(new FlushRequest().waitIfOngoing(true).force(true));
}
} else if (currentRouting.primary()
&& currentRouting.relocating()
&& replicationTracker.isRelocated()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,12 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.replication.ReplicationMode;
import org.opensearch.action.support.replication.ReplicationOperation;
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationTask;
import org.opensearch.action.support.replication.TransportReplicationAction;
import org.opensearch.cluster.action.shard.ShardStateAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.StreamInput;
Expand All @@ -33,18 +27,22 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardClosedException;
import org.opensearch.index.shard.ShardNotInPrimaryModeException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.common.ReplicationTimer;
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

import org.opensearch.action.support.replication.ReplicationMode;

/**
* Replication action responsible for publishing checkpoint to a replica shard.
*
Expand Down Expand Up @@ -110,32 +108,35 @@ public ReplicationMode getReplicationMode(IndexShard indexShard) {
* Publish checkpoint request to shard
*/
final void publish(IndexShard indexShard, ReplicationCheckpoint checkpoint) {
String primaryAllocationId = indexShard.routingEntry().allocationId().getId();
long primaryTerm = indexShard.getPendingPrimaryTerm();
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
// we have to execute under the system context so that if security is enabled the sync is authorized
threadContext.markAsSystemContext();
PublishCheckpointRequest request = new PublishCheckpointRequest(checkpoint);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
transportService.sendChildRequest(
indexShard.recoveryState().getTargetNode(),
transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

final List<ShardRouting> replicationTargets = indexShard.getReplicationGroup().getReplicationTargets();
for (ShardRouting replicationTarget : replicationTargets) {
if (replicationTarget.primary()) {
continue;
}
final DiscoveryNode node = clusterService.state().nodes().get(replicationTarget.currentNodeId());
final ConcreteReplicaRequest<PublishCheckpointRequest> replicaRequest = new ConcreteReplicaRequest<>(
request,
replicationTarget.allocationId().getId(),
primaryTerm,
indexShard.getLastKnownGlobalCheckpoint(),
indexShard.getMaxSeqNoOfUpdatesOrDeletes()
);
final ReplicationTimer timer = new ReplicationTimer();
timer.start();
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "segrep_publish_checkpoint", request);
ActionListener<ReplicationOperation.ReplicaResponse> listener = new ActionListener<>() {
@Override
public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
public void handleResponse(ReplicationResponse response) {
timer.stop();
logger.trace(
() -> new ParameterizedMessage(
Expand All @@ -150,36 +151,29 @@ public void onResponse(ReplicationOperation.ReplicaResponse replicaResponse) {
}

@Override
public void onFailure(Exception e) {
public void handleException(TransportException e) {
timer.stop();
logger.trace("[shardId {}] Failed to publish checkpoint, timing: {}", indexShard.shardId().getId(), timer.time());
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(
e,
NodeClosedException.class,
IndexNotFoundException.class,
AlreadyClosedException.class,
IndexShardClosedException.class
IndexShardClosedException.class,
ShardNotInPrimaryModeException.class
) != null) {
// the index was deleted or the shard is closed
// Node is shutting down or the index was deleted or the shard is closed
return;
}
logger.warn(
new ParameterizedMessage("{} segment replication checkpoint publishing failed", indexShard.shardId()),
e
);
}
};
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
listener,
ReplicaResponse::new
);
transportService.sendChildRequest(node, transportReplicaAction, replicaRequest, task, transportOptions, handler);
}
}
);
logger.trace(
() -> new ParameterizedMessage(
"[shardId {}] Publishing replication checkpoint [{}]",
Expand All @@ -196,7 +190,7 @@ protected void shardOperationOnPrimary(
IndexShard primary,
ActionListener<PrimaryResult<PublishCheckpointRequest, ReplicationResponse>> listener
) {
throw new OpenSearchException("PublishCheckpointAction should not hit primary shards");
ActionListener.completeWith(listener, () -> new PrimaryResult<>(request, new ReplicationResponse()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.search.ReferenceManager;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.seqno.LocalCheckpointTracker;
Expand Down Expand Up @@ -182,6 +184,28 @@ public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOExcep
}
}

public void testRefreshOnNRTEngine() throws IOException {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

try (
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration());
assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration());

ReferenceManager<OpenSearchDirectoryReader> referenceManager = nrtEngine.getReferenceManager(Engine.SearcherScope.EXTERNAL);
OpenSearchDirectoryReader readerBeforeRefresh = referenceManager.acquire();

nrtEngine.refresh("test refresh");
OpenSearchDirectoryReader readerAfterRefresh = referenceManager.acquire();

// Verify both readers before and after refresh are same and no change in segments
assertSame(readerBeforeRefresh, readerAfterRefresh);

}
}

public void testTrimTranslogOps() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.index.shard;

import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfos;
import org.junit.Assert;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -569,21 +568,15 @@ public void testReplicaReceivesLowerGeneration() throws Exception {
numDocs = randomIntBetween(numDocs + 1, numDocs + 10);
shards.indexDocs(numDocs);
flushShard(primary, false);
assertLatestCommitGen(4, primary);
replicateSegments(primary, List.of(replica_1));

assertEqualCommittedSegments(primary, replica_1);
assertLatestCommitGen(4, primary);
assertLatestCommitGen(5, replica_1);
assertLatestCommitGen(3, replica_2);

shards.promoteReplicaToPrimary(replica_2).get();
primary.close("demoted", false);
primary.store().close();
IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId());
shards.recoverReplica(oldPrimary);
assertLatestCommitGen(5, oldPrimary);
assertLatestCommitGen(5, replica_2);

numDocs = randomIntBetween(numDocs + 1, numDocs + 10);
shards.indexDocs(numDocs);
Expand Down Expand Up @@ -1078,14 +1071,6 @@ private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws I
return newPrimary;
}

private void assertLatestCommitGen(long expected, IndexShard... shards) throws IOException {
for (IndexShard indexShard : shards) {
try (final GatedCloseable<IndexCommit> commit = indexShard.acquireLastIndexCommit(false)) {
assertEquals(expected, commit.get().getGeneration());
}
}
}

private void assertEqualCommittedSegments(IndexShard primary, IndexShard... replicas) throws IOException {
for (IndexShard replica : replicas) {
final SegmentInfos replicaInfos = replica.store().readLastCommittedSegmentsInfo();
Expand Down
Loading

0 comments on commit 1e5d913

Please sign in to comment.