Skip to content

Commit

Permalink
Fix Segment Replication integ tests (#2637)
Browse files Browse the repository at this point in the history
* Fix Segment replication integ tests by using a single BackgroundIndexer.

BackgroundIndexers begin indexing docs with a docId of 0 up to the requested numDocs.  Using two overlapped the docIds so counts were incorrect.
This changes our tests to use a single indexer, it also improves assertions on segment data on both shards.

Signed-off-by: Marc Handalian <[email protected]>

* Fix Index shards to correctly compare checkpoint version instead of segment gen when determining
if a received checkpoint should be processed. Without this change replicas will ignore checkpoints for merges.

Signed-off-by: Marc Handalian <[email protected]>

* Updated based on PR feedback.

Rename latestUnprocessedCheckpoint to latestReceivedCheckpoint.
Remove redundant checkpoint variable in onNewCheckpoint.
Rename isValidCheckpoint to shouldProcessCheckpoint.
Add missing isAheadOf check to shouldProcessCheckpoint.
Update SegmentReplicationIT's assertSegmentStats to be more readable.
Removed latest seqNo hack.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 authored Apr 5, 2022
1 parent e4ba853 commit ae214ec
Show file tree
Hide file tree
Showing 9 changed files with 251 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,28 @@

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.Assert;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.engine.Segment;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -44,24 +58,39 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);

// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
refresh(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
flush(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicationAfterForceMerge() throws Exception {
Expand All @@ -71,29 +100,48 @@ public void testReplicationAfterForceMerge() throws Exception {
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;

// Index a second set of docs so we can merge into one segment.
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that replicas preserve these files so the local store is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
refresh(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that replicas preserve these files so the local store is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception {
public void testReplicaSetupAfterPrimaryIndexesDocs() {
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -122,5 +170,63 @@ public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception {
ensureGreen(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertSegmentStats(REPLICA_COUNT);
}

private void assertSegmentStats(int numberOfReplicas) {
client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() {
@Override
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {

List<ShardSegments[]> segmentsByIndex = indicesSegmentResponse.getIndices()
.values()
.stream() // get list of IndexSegments
.flatMap(is -> is.getShards().values().stream()) // Map to shard replication group
.map(IndexShardSegments::getShards) // get list of segments across replication group
.collect(Collectors.toList());

// There will be an entry in the list for each index.
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {

// Separate Primary & replica shards ShardSegments.
final Map<Boolean, List<ShardSegments>> segmentListMap = Arrays.stream(replicationGroupSegments)
.collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1);
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();

// create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on
// replicas.
final Map<String, Segment> primarySegmentsMap = primaryShardSegments.getSegments()
.stream()
.collect(Collectors.toMap(Segment::getName, Function.identity()));
// For every replica, ensure that its segments are in the same state as on the primary.
// It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a
// list comparison.
// This equality check includes search/committed properties on the Segment. Combined with docCount checks,
// this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest
// commit point, even if they are not searchable.
assertEquals(
"There should be a ShardSegment entry for each replica in the replicationGroup",
numberOfReplicas,
replicaShardSegments.size()
);

for (ShardSegments shardSegment : replicaShardSegments) {
for (Segment replicaSegment : shardSegment.getSegments()) {
final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName());
assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment);
}
}
}
}

@Override
public void onFailure(Exception e) {
Assert.fail("Error fetching segment stats");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {}
public void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) throws IOException {}

public long getProcessedLocalCheckpoint() {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.OpenSearchMergePolicy;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.store.Store;
import org.opensearch.index.translog.DefaultTranslogDeletionPolicy;
import org.opensearch.index.translog.Translog;
import org.opensearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -336,11 +337,28 @@ public InternalEngine(EngineConfig engineConfig) {
}

@Override
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
public synchronized void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo)
throws IOException {
assert engineConfig.isReadOnly() : "Only replicas should update Infos";

store.incRef();
try {
refreshLastCommittedSegmentInfos();
// clean up the local store of old segment files
// and validate the latest segment infos against the snapshot sent from the primary shard.
store.cleanupAndVerify(
"finalize - clean with in memory infos",
expectedMetadata,
store.getMetadata(infos),
store.getMetadata(lastCommittedSegmentInfos)
);
} finally {
store.decRef();
}
// Update the current infos reference on the Engine's reader.
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
externalReaderManager.maybeRefresh();
}

private LocalCheckpointTracker createLocalCheckpointTracker(
Expand Down
13 changes: 11 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/Segment.java
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,17 @@ public boolean equals(Object o) {

Segment segment = (Segment) o;

return Objects.equals(name, segment.name);

return Objects.equals(name, segment.name)
&& Objects.equals(docCount, segment.docCount)
&& Objects.equals(delDocCount, segment.delDocCount)
&& Objects.equals(sizeInBytes, segment.sizeInBytes)
&& Objects.equals(search, segment.search)
&& Objects.equals(committed, segment.committed)
&& Objects.equals(attributes, segment.attributes)
&& Objects.equals(version, segment.version)
&& Objects.equals(compound, segment.compound)
&& Objects.equals(mergeId, segment.mergeId)
&& Objects.equals(generation, segment.generation);
}

@Override
Expand Down
72 changes: 51 additions & 21 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ Runnable getGlobalCheckpointSyncer() {

private final CheckpointRefreshListener checkpointRefreshListener;

private volatile ReplicationCheckpoint latestReceivedCheckpoint;

public IndexShard(
final ShardRouting shardRouting,
final IndexSettings indexSettings,
Expand Down Expand Up @@ -1439,16 +1441,18 @@ public SegmentInfos getLatestSegmentInfos() {
}

public ReplicationCheckpoint getLatestReplicationCheckpoint() {
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
return new ReplicationCheckpoint(
this.shardId,
getOperationPrimaryTerm(),
getLatestSegmentInfos().getGeneration(),
getProcessedLocalCheckpoint()
latestSegmentInfos.getGeneration(),
getProcessedLocalCheckpoint(),
latestSegmentInfos.getVersion()
);
}

public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infos, seqNo);
public void finalizeReplication(SegmentInfos infos, MetadataSnapshot expectedMetadata, long seqNo) throws IOException {
getEngine().finalizeReplication(infos, expectedMetadata, seqNo);
}

/**
Expand Down Expand Up @@ -3652,32 +3656,36 @@ public synchronized void onNewCheckpoint(
final PrimaryShardReplicationSource source,
final SegmentReplicationReplicaService segmentReplicationReplicaService
) {
logger.debug("Checkpoint received {}", request.getCheckpoint());
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint());
if (localCheckpoint.equals(request.getCheckpoint())) {
logger.debug("Ignore - Shard is already on checkpoint");
return;
}
if (state.equals(IndexShardState.STARTED) == false) {
logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state);
return;
}
if (isReplicating()) {
logger.debug("Ignore - shard is currently replicating to a checkpoint");
return;
final ReplicationCheckpoint requestCheckpoint = request.getCheckpoint();
logger.debug("Checkpoint received {}", requestCheckpoint);

if (requestCheckpoint.isAheadOf(latestReceivedCheckpoint)) {
latestReceivedCheckpoint = requestCheckpoint;
}

if (shouldProcessCheckpoint(requestCheckpoint) == false) return;
try {
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
logger.debug("Processing new checkpoint {}", requestCheckpoint);
segmentReplicationReplicaService.startReplication(
checkpoint,
requestCheckpoint,
this,
source,
new SegmentReplicationReplicaService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
// if we received a checkpoint during the copy event that is ahead of this
// try and process it.
if (latestReceivedCheckpoint.isAheadOf(getLatestReplicationCheckpoint())) {
threadPool.generic()
.execute(
() -> onNewCheckpoint(
new PublishCheckpointRequest(latestReceivedCheckpoint),
source,
segmentReplicationReplicaService
)
);
}
}

@Override
Expand All @@ -3695,6 +3703,28 @@ public void onReplicationFailure(
}
}

private boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
ReplicationCheckpoint localCheckpoint = getLatestReplicationCheckpoint();
logger.debug("Local Checkpoint {}", getLatestReplicationCheckpoint());
if (state.equals(IndexShardState.STARTED) == false) {
logger.debug("Ignore - shard is not started {} {}", recoveryState.getStage(), this.state);
return false;
}
if (isReplicating()) {
logger.debug("Ignore - shard is currently replicating to a checkpoint");
return false;
}
if (localCheckpoint.isAheadOf(requestCheckpoint)) {
logger.debug("Ignore - Shard is already on checkpoint {} that is ahead of {}", localCheckpoint, requestCheckpoint);
return false;
}
if (localCheckpoint.equals(requestCheckpoint)) {
logger.debug("Ignore - Shard is already on checkpoint {}", requestCheckpoint);
return false;
}
return true;
}

public SegmentReplicationState getReplicationState() {
return this.segRepState;
}
Expand Down
Loading

0 comments on commit ae214ec

Please sign in to comment.