Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Fix flaky tests testSegmentReplicationStatsResponse() and testSegmentReplicationStatsWithTimeout() #6268

Merged
merged 10 commits into from
Feb 21, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -152,56 +147,6 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6255")
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String dataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String anotherDataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);

int numShards = 4;
assertAcked(
prepareCreate(
"test-index",
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");

// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));

// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,45 @@

import java.util.concurrent.CountDownLatch;
import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationStatsResponse() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
public void testSegmentReplicationStatsResponse() {
Copy link
Member

@dreamer-89 dreamer-89 Feb 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Rishikesh1159 : I also fixed this flaky test as part of #6370. Please rebase your changes against main ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure let me rebase

Copy link
Member

@dreamer-89 dreamer-89 Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, I see one run where this is still flaky #6366 (comment).
Previously, I root caused the problem happening due to race condition and fixed by asserting on replication state to be done which fixed it from 20% failing -> 0%. The assertBusy block by default waits for 10s not sure if segment replication needs more time to complete. May be we can increase the assertBusy timeout to 60secs and check if this test is still flaky.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes testSegmentReplicationStatsResponse() was flaky previously because of stage not reaching done within certain time. But now I have changed the test completely. As we are only testing API response and we are concerned only about API response but not about segment replication stage. So I changed the test and we no more assert on segrep stage which was causing flakiness.

Copy link
Member Author

@Rishikesh1159 Rishikesh1159 Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With changes in testSegmentReplicationStatsResponse() we only assert if we get a successful or failure response from API call. We are not concerned about if segment replication event has finished processing or not, as purpose of API is only to give response irrespective of state of segrep event.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is still value in asserting the replication state as it can uncover bugs where replication never completes for various reasons and it is also part of API response. I will leave it for you to decide.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can add asserts on replication state. It doesn't break anything but as you said there is a chance it can uncover some bugs. I have added that in latest commit. Thanks @dreamer-89

internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();

int numShards = 4;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
final SegmentReplicationStatsResponse response = client().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify API Response
assertEquals(response.shardSegmentReplicationStates().size(), SHARD_COUNT);
assertEquals(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0);
});
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
assertThat(segmentReplicationStatsResponse.shardSegmentReplicationStates().size(), equalTo(1));
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));
}

public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception {
Expand Down Expand Up @@ -121,6 +131,10 @@ public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throw
completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
SegmentReplicationState.Stage.DONE
);
assertThat(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use assertEquals instead of assertThat here. This is applicable for other usages of assertThat as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I will update all usages

completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount(),
greaterThan(0)
);
waitForAssertions.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest
singleIndexWithSegmentReplicationDisabled = shardRouting.getIndexName();
return null;
}
if (indexShard.indexSettings().isSegRepEnabled() == false) {
if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is needed ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shard is primary we don't even need to make a call to get segment replication state, as segment replication only happens on replica shards. If we don't have this primary check here, it will make a call to get segment replication state and check some collections and eventually return null on a primary shard. So with this addition of condition check we are just returning null early if it is a primary sahrd

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ReplicationState implementation to track Segment Replication events.
Expand Down Expand Up @@ -121,23 +122,28 @@ public Map<String, Long> getTimingData() {
}

public TimeValue getReplicatingStageTime() {
return new TimeValue(timingData.get(Stage.REPLICATING.toString()));
long time = timingData.getOrDefault(Stage.REPLICATING.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getGetCheckpointInfoStageTime() {
return new TimeValue(timingData.get(Stage.GET_CHECKPOINT_INFO.toString()));
long time = timingData.getOrDefault(Stage.GET_CHECKPOINT_INFO.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getFileDiffStageTime() {
return new TimeValue(timingData.get(Stage.FILE_DIFF.toString()));
long time = timingData.getOrDefault(Stage.FILE_DIFF.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getGetFileStageTime() {
return new TimeValue(timingData.get(Stage.GET_FILES.toString()));
long time = timingData.getOrDefault(Stage.GET_FILES.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getFinalizeReplicationStageTime() {
return new TimeValue(timingData.get(Stage.FINALIZE_REPLICATION.toString()));
long time = timingData.getOrDefault(Stage.FINALIZE_REPLICATION.toString(), 0L);
return new TimeValue(time);
}

public SegmentReplicationState(
Expand All @@ -153,7 +159,7 @@ public SegmentReplicationState(
this.sourceDescription = sourceDescription;
this.targetNode = targetNode;
// Timing data will have as many entries as stages, plus one
timingData = new HashMap<>(Stage.values().length + 1);
timingData = new ConcurrentHashMap<>(Stage.values().length + 1);
overallTimer = new ReplicationTimer();
stageTimer = new ReplicationTimer();
setStage(Stage.INIT);
Expand All @@ -180,7 +186,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(replicationId);
overallTimer.writeTo(out);
stageTimer.writeTo(out);
out.writeMap(timingData, StreamOutput::writeString, StreamOutput::writeLong);

// Copy of timingData is created to avoid concurrent modification of timingData map.
Map<String, Long> timingDataCopy = new HashMap<>();
for (Map.Entry<String, Long> entry : timingData.entrySet()) {
timingDataCopy.put(entry.getKey(), entry.getValue());
}
out.writeMap(timingDataCopy, StreamOutput::writeString, StreamOutput::writeLong);
out.writeString(sourceDescription);
targetNode.writeTo(out);
}
Expand Down Expand Up @@ -257,22 +269,20 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, getTimer().stopTime());
}
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(getTimer().time()));
if (sourceDescription != null) {
builder.field(Fields.SOURCE, getSourceDescription());
}
builder.field(Fields.SOURCE, getSourceDescription());

builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.getId());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.getName());
builder.endObject();

if (targetNode != null) {
builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.getId());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.getName());
builder.endObject();
}
builder.startObject(SegmentReplicationState.Fields.INDEX);
index.toXContent(builder, params);
builder.endObject();

builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime());
builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime());
builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ protected Table getTableWithHeader(RestRequest request) {
t.startHeaders()
.addCell("index", "alias:i,idx;desc:index name")
.addCell("shardId", "alias:s;desc: shard Id")
.addCell("start_time", "default:false;alias:start;desc:segment replication start time")
.addCell("start_time_millis", "default:false;alias:start_millis;desc:segment replication start time in epoch milliseconds")
.addCell("stop_time", "default:false;alias:stop;desc:segment replication stop time")
.addCell("stop_time_millis", "default:false;alias:stop_millis;desc:segment replication stop time in epoch milliseconds")
.addCell("time", "alias:t,ti;desc:segment replication time")
.addCell("stage", "alias:st;desc:segment replication stage")
.addCell("source_description", "alias:sdesc;desc:source description")
Expand All @@ -106,7 +102,11 @@ protected Table getTableWithHeader(RestRequest request) {
.addCell("bytes_fetched", "alias:bf;desc:bytes fetched")
.addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched");
if (detailed) {
t.addCell("files", "alias:f;desc:number of files to fetch")
t.addCell("start_time", "alias:start;desc:segment replication start time")
.addCell("start_time_millis", "alias:start_millis;desc:segment replication start time in epoch milliseconds")
.addCell("stop_time", "alias:stop;desc:segment replication stop time")
.addCell("stop_time_millis", "alias:stop_millis;desc:segment replication stop time in epoch milliseconds")
.addCell("files", "alias:f;desc:number of files to fetch")
.addCell("files_total", "alias:tf;desc:total number of files")
.addCell("bytes", "alias:b;desc:number of bytes to fetch")
.addCell("bytes_total", "alias:tb;desc:total number of bytes")
Expand Down Expand Up @@ -162,10 +162,6 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) {
t.startRow();
t.addCell(index);
t.addCell(state.getShardRouting().shardId().id());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()));
t.addCell(state.getTimer().startTime());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()));
t.addCell(state.getTimer().stopTime());
t.addCell(new TimeValue(state.getTimer().time()));
t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getSourceDescription());
Expand All @@ -176,6 +172,10 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) {
t.addCell(state.getIndex().recoveredBytes());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent()));
if (detailed) {
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()));
t.addCell(state.getTimer().startTime());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()));
t.addCell(state.getTimer().stopTime());
t.addCell(state.getIndex().totalRecoverFiles());
t.addCell(state.getIndex().totalFileCount());
t.addCell(state.getIndex().totalRecoverBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.Table;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentOpenSearchExtension;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.SegmentReplicationState;
Expand Down Expand Up @@ -92,10 +91,6 @@ public void testSegmentReplicationAction() {
final List<String> expectedHeaders = Arrays.asList(
"index",
"shardId",
"start_time",
"start_time_millis",
"stop_time",
"stop_time_millis",
"time",
"stage",
"source_description",
Expand All @@ -118,10 +113,6 @@ public void testSegmentReplicationAction() {
final List<Object> expectedValues = Arrays.asList(
"index",
i,
XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()),
state.getTimer().startTime(),
XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()),
state.getTimer().stopTime(),
new TimeValue(state.getTimer().time()),
state.getStage().name().toLowerCase(Locale.ROOT),
state.getSourceDescription(),
Expand Down