Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Fix flaky tests testSegmentReplicationStatsResponse() and testSegmentReplicationStatsWithTimeout() #6268

Merged
merged 10 commits into from
Feb 21, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@
import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryAction;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsAction;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsAction;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -152,56 +147,6 @@ public void testRecoveriesWithTimeout() {
assertThat(recoveryResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6255")
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved
public void testSegmentReplicationStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String dataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);
String anotherDataNode = internalCluster().startDataOnlyNode(
Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REPLICATION_TYPE, "true").build()
);

int numShards = 4;
assertAcked(
prepareCreate(
"test-index",
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index("test-index", "doc", Integer.toString(i));
}
refresh("test-index");
ensureSearchable("test-index");

// Happy case
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats()
.get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards * 2));

// simulate timeout on bad node.
simulateTimeoutAtTransport(dataNode, anotherDataNode, SegmentReplicationStatsAction.NAME);

// verify response with bad node.
segmentReplicationStatsResponse = dataNodeClient().admin().indices().prepareSegmentReplicationStats().get();
assertThat(segmentReplicationStatsResponse.getTotalShards(), equalTo(numShards * 2));
assertThat(segmentReplicationStatsResponse.getSuccessfulShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getFailedShards(), equalTo(numShards));
assertThat(segmentReplicationStatsResponse.getShardFailures()[0].reason(), containsString("ReceiveTimeoutTransportException"));
}

public void testStatsWithTimeout() {
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,55 @@
import org.opensearch.transport.TransportService;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationStatsIT extends SegmentReplicationBaseIT {

public void testSegmentReplicationStatsResponse() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
ensureGreen(INDEX_NAME);

// index 10 docs
for (int i = 0; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().actionGet();
internalCluster().startClusterManagerOnlyNode();
String dataNode = internalCluster().startDataOnlyNode();
String anotherDataNode = internalCluster().startDataOnlyNode();

int numShards = 4;
assertAcked(
prepareCreate(
INDEX_NAME,
0,
Settings.builder()
.put("number_of_shards", numShards)
.put("number_of_replicas", 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
)
);
ensureGreen();
final long numDocs = scaledRandomIntBetween(50, 100);
for (int i = 0; i < numDocs; i++) {
index(INDEX_NAME, "doc", Integer.toString(i));
}
refresh(INDEX_NAME);
waitForSearchableDocs(10L, asList(primaryNode, replicaNode));
ensureSearchable(INDEX_NAME);

assertBusy(() -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned here, seen one instance where this test failed. May be we can increase the assertBusy timeout and see if it helps.

Copy link
Member Author

@Rishikesh1159 Rishikesh1159 Feb 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah sure. May be I can increase it to 60sec just to be certain that replication has completed as you said before.

final SegmentReplicationStatsResponse response = client().admin()
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
.indices()
.prepareSegmentReplicationStats(INDEX_NAME)
.execute()
.actionGet();
// Verify API Response
assertEquals(response.shardSegmentReplicationStates().size(), SHARD_COUNT);
assertEquals(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(), SegmentReplicationState.Stage.DONE);
assertTrue(response.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0);
});
assertEquals(segmentReplicationStatsResponse.shardSegmentReplicationStates().size(), 1);
assertEquals(segmentReplicationStatsResponse.getTotalShards(), numShards * 2);
assertEquals(segmentReplicationStatsResponse.getSuccessfulShards(), numShards * 2);
assertEquals(
segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
SegmentReplicationState.Stage.DONE
);
assertTrue(
segmentReplicationStatsResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0
);
}, 1, TimeUnit.MINUTES);
}

public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throws Exception {
Expand Down Expand Up @@ -121,6 +140,7 @@ public void testSegmentReplicationStatsResponseForActiveAndCompletedOnly() throw
completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getStage(),
SegmentReplicationState.Stage.DONE
);
assertTrue(completedOnlyResponse.shardSegmentReplicationStates().get(INDEX_NAME).get(0).getIndex().recoveredFileCount() > 0);
waitForAssertions.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected SegmentReplicationState shardOperation(SegmentReplicationStatsRequest
singleIndexWithSegmentReplicationDisabled = shardRouting.getIndexName();
return null;
}
if (indexShard.indexSettings().isSegRepEnabled() == false) {
if (indexShard.indexSettings().isSegRepEnabled() == false || shardRouting.primary()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change is needed ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If shard is primary we don't even need to make a call to get segment replication state, as segment replication only happens on replica shards. If we don't have this primary check here, it will make a call to get segment replication state and check some collections and eventually return null on a primary shard. So with this addition of condition check we are just returning null early if it is a primary sahrd

return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* ReplicationState implementation to track Segment Replication events.
Expand Down Expand Up @@ -121,23 +122,28 @@ public Map<String, Long> getTimingData() {
}

public TimeValue getReplicatingStageTime() {
return new TimeValue(timingData.get(Stage.REPLICATING.toString()));
long time = timingData.getOrDefault(Stage.REPLICATING.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getGetCheckpointInfoStageTime() {
return new TimeValue(timingData.get(Stage.GET_CHECKPOINT_INFO.toString()));
long time = timingData.getOrDefault(Stage.GET_CHECKPOINT_INFO.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getFileDiffStageTime() {
return new TimeValue(timingData.get(Stage.FILE_DIFF.toString()));
long time = timingData.getOrDefault(Stage.FILE_DIFF.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getGetFileStageTime() {
return new TimeValue(timingData.get(Stage.GET_FILES.toString()));
long time = timingData.getOrDefault(Stage.GET_FILES.toString(), 0L);
return new TimeValue(time);
}

public TimeValue getFinalizeReplicationStageTime() {
return new TimeValue(timingData.get(Stage.FINALIZE_REPLICATION.toString()));
long time = timingData.getOrDefault(Stage.FINALIZE_REPLICATION.toString(), 0L);
return new TimeValue(time);
}

public SegmentReplicationState(
Expand All @@ -153,7 +159,7 @@ public SegmentReplicationState(
this.sourceDescription = sourceDescription;
this.targetNode = targetNode;
// Timing data will have as many entries as stages, plus one
timingData = new HashMap<>(Stage.values().length + 1);
timingData = new ConcurrentHashMap<>(Stage.values().length + 1);
overallTimer = new ReplicationTimer();
stageTimer = new ReplicationTimer();
setStage(Stage.INIT);
Expand All @@ -180,7 +186,13 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(replicationId);
overallTimer.writeTo(out);
stageTimer.writeTo(out);
out.writeMap(timingData, StreamOutput::writeString, StreamOutput::writeLong);

// Copy of timingData is created to avoid concurrent modification of timingData map.
Map<String, Long> timingDataCopy = new HashMap<>();
for (Map.Entry<String, Long> entry : timingData.entrySet()) {
timingDataCopy.put(entry.getKey(), entry.getValue());
}
out.writeMap(timingDataCopy, StreamOutput::writeString, StreamOutput::writeLong);
out.writeString(sourceDescription);
targetNode.writeTo(out);
}
Expand Down Expand Up @@ -257,22 +269,20 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.timeField(Fields.STOP_TIME_IN_MILLIS, Fields.STOP_TIME, getTimer().stopTime());
}
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, new TimeValue(getTimer().time()));
if (sourceDescription != null) {
builder.field(Fields.SOURCE, getSourceDescription());
}
builder.field(Fields.SOURCE, getSourceDescription());

builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.getId());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.getName());
builder.endObject();

if (targetNode != null) {
builder.startObject(Fields.TARGET);
builder.field(Fields.ID, targetNode.getId());
builder.field(Fields.HOST, targetNode.getHostName());
builder.field(Fields.TRANSPORT_ADDRESS, targetNode.getAddress().toString());
builder.field(Fields.IP, targetNode.getHostAddress());
builder.field(Fields.NAME, targetNode.getName());
builder.endObject();
}
builder.startObject(SegmentReplicationState.Fields.INDEX);
index.toXContent(builder, params);
builder.endObject();

builder.field(Fields.REPLICATING_STAGE, getReplicatingStageTime());
builder.field(Fields.GET_CHECKPOINT_INFO_STAGE, getGetCheckpointInfoStageTime());
builder.field(Fields.FILE_DIFF_STAGE, getFileDiffStageTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ protected Table getTableWithHeader(RestRequest request) {
t.startHeaders()
.addCell("index", "alias:i,idx;desc:index name")
.addCell("shardId", "alias:s;desc: shard Id")
.addCell("start_time", "default:false;alias:start;desc:segment replication start time")
.addCell("start_time_millis", "default:false;alias:start_millis;desc:segment replication start time in epoch milliseconds")
.addCell("stop_time", "default:false;alias:stop;desc:segment replication stop time")
.addCell("stop_time_millis", "default:false;alias:stop_millis;desc:segment replication stop time in epoch milliseconds")
.addCell("time", "alias:t,ti;desc:segment replication time")
.addCell("stage", "alias:st;desc:segment replication stage")
.addCell("source_description", "alias:sdesc;desc:source description")
Expand All @@ -106,7 +102,11 @@ protected Table getTableWithHeader(RestRequest request) {
.addCell("bytes_fetched", "alias:bf;desc:bytes fetched")
.addCell("bytes_percent", "alias:bp;desc:percent of bytes fetched");
if (detailed) {
t.addCell("files", "alias:f;desc:number of files to fetch")
t.addCell("start_time", "alias:start;desc:segment replication start time")
.addCell("start_time_millis", "alias:start_millis;desc:segment replication start time in epoch milliseconds")
.addCell("stop_time", "alias:stop;desc:segment replication stop time")
.addCell("stop_time_millis", "alias:stop_millis;desc:segment replication stop time in epoch milliseconds")
.addCell("files", "alias:f;desc:number of files to fetch")
.addCell("files_total", "alias:tf;desc:total number of files")
.addCell("bytes", "alias:b;desc:number of bytes to fetch")
.addCell("bytes_total", "alias:tb;desc:total number of bytes")
Expand Down Expand Up @@ -162,10 +162,6 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) {
t.startRow();
t.addCell(index);
t.addCell(state.getShardRouting().shardId().id());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()));
t.addCell(state.getTimer().startTime());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()));
t.addCell(state.getTimer().stopTime());
t.addCell(new TimeValue(state.getTimer().time()));
t.addCell(state.getStage().toString().toLowerCase(Locale.ROOT));
t.addCell(state.getSourceDescription());
Expand All @@ -176,6 +172,10 @@ public int compare(SegmentReplicationState o1, SegmentReplicationState o2) {
t.addCell(state.getIndex().recoveredBytes());
t.addCell(String.format(Locale.ROOT, "%1.1f%%", state.getIndex().recoveredBytesPercent()));
if (detailed) {
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()));
t.addCell(state.getTimer().startTime());
t.addCell(XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()));
t.addCell(state.getTimer().stopTime());
t.addCell(state.getIndex().totalRecoverFiles());
t.addCell(state.getIndex().totalFileCount());
t.addCell(state.getIndex().totalRecoverBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.Table;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentOpenSearchExtension;
import org.opensearch.index.Index;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.replication.SegmentReplicationState;
Expand Down Expand Up @@ -92,10 +91,6 @@ public void testSegmentReplicationAction() {
final List<String> expectedHeaders = Arrays.asList(
"index",
"shardId",
"start_time",
"start_time_millis",
"stop_time",
"stop_time_millis",
"time",
"stage",
"source_description",
Expand All @@ -118,10 +113,6 @@ public void testSegmentReplicationAction() {
final List<Object> expectedValues = Arrays.asList(
"index",
i,
XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().startTime()),
state.getTimer().startTime(),
XContentOpenSearchExtension.DEFAULT_DATE_PRINTER.print(state.getTimer().stopTime()),
state.getTimer().stopTime(),
new TimeValue(state.getTimer().time()),
state.getStage().name().toLowerCase(Locale.ROOT),
state.getSourceDescription(),
Expand Down