Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure latest replication checkpoint post failover has correct operational primary term #11990

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.indices.IndicesService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.test.InternalTestCluster;
Expand Down Expand Up @@ -135,6 +140,54 @@ private void testRestoreFlow(int numberOfIterations, boolean invokeFlush, boolea
restoreAndVerify(shardCount, 0, indexStats);
}

public void testMultipleWriters() throws Exception {
prepareCluster(1, 2, INDEX_NAME, 1, 1);
Map<String, Long> indexStats = indexData(randomIntBetween(2, 5), true, true, INDEX_NAME);
assertEquals(2, getNumShards(INDEX_NAME).totalNumShards);

// ensure replica has latest checkpoint
flushAndRefresh(INDEX_NAME);
flushAndRefresh(INDEX_NAME);

Index indexObj = clusterService().state().metadata().indices().get(INDEX_NAME).getIndex();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, primaryNodeName(INDEX_NAME));
IndexService indexService = indicesService.indexService(indexObj);
IndexShard indexShard = indexService.getShard(0);
RemoteSegmentMetadata remoteSegmentMetadataBeforeFailover = indexShard.getRemoteDirectory().readLatestMetadataFile();

// ensure all segments synced to replica
assertBusy(
() -> assertHitCount(
client(primaryNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(),
indexStats.get(TOTAL_OPERATIONS)
),
30,
TimeUnit.SECONDS
);
assertBusy(
() -> assertHitCount(
client(replicaNodeName(INDEX_NAME)).prepareSearch(INDEX_NAME).setSize(0).get(),
indexStats.get(TOTAL_OPERATIONS)
),
30,
TimeUnit.SECONDS
);

String newPrimaryNodeName = replicaNodeName(INDEX_NAME);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primaryNodeName(INDEX_NAME)));
ensureYellow(INDEX_NAME);

indicesService = internalCluster().getInstance(IndicesService.class, newPrimaryNodeName);
indexService = indicesService.indexService(indexObj);
indexShard = indexService.getShard(0);
IndexShard finalIndexShard = indexShard;
assertBusy(() -> assertTrue(finalIndexShard.isStartedPrimary() && finalIndexShard.isPrimaryMode()));
assertEquals(
finalIndexShard.getLatestSegmentInfosAndCheckpoint().v2().getPrimaryTerm(),
remoteSegmentMetadataBeforeFailover.getPrimaryTerm() + 1
);
}

/**
* Helper function to test restoring an index having replicas from remote store when all the nodes housing the primary/replica drop.
* @param numberOfIterations Number of times a refresh/flush should be invoked, followed by indexing some data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,8 @@ ReplicationCheckpoint computeReplicationCheckpoint(SegmentInfos segmentInfos) th
}
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
if (latestReplicationCheckpoint.getSegmentInfosVersion() == segmentInfos.getVersion()
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()) {
&& latestReplicationCheckpoint.getSegmentsGen() == segmentInfos.getGeneration()
sachinpkale marked this conversation as resolved.
Show resolved Hide resolved
&& latestReplicationCheckpoint.getPrimaryTerm() == getOperationPrimaryTerm()) {
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
linuxpi marked this conversation as resolved.
Show resolved Hide resolved
return latestReplicationCheckpoint;
}
final Map<String, StoreFileMetadata> metadataMap = store.getSegmentMetadataMap(segmentInfos);
Expand Down Expand Up @@ -2014,7 +2015,7 @@ public void close(String reason, boolean flushEngine, boolean deleted) throws IO
/*
ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003
*/
private RemoteSegmentStoreDirectory getRemoteDirectory() {
public RemoteSegmentStoreDirectory getRemoteDirectory() {
assert indexSettings.isRemoteStoreEnabled();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -209,6 +210,16 @@ private boolean syncSegments() {
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
final ReplicationCheckpoint checkpoint = indexShard.computeReplicationCheckpoint(segmentInfos);
if (checkpoint.getPrimaryTerm() != indexShard.getOperationPrimaryTerm()) {
throw new IllegalStateException(
String.format(
Locale.ROOT,
"primaryTerm mismatch during segments upload to remote store [%s] != [%s]",
checkpoint.getPrimaryTerm(),
indexShard.getOperationPrimaryTerm()
)
);
}
// Capture replication checkpoint before uploading the segments as upload can take some time and checkpoint can
// move.
long lastRefreshedCheckpoint = ((InternalEngine) indexShard.getEngine()).lastRefreshedCheckpoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,33 @@ public void testTrackerData() throws Exception {
assertBusy(() -> assertNoLag(tracker));
}

/**
* Tests segments upload fails with replication checkpoint and replication tracker primary term mismatch
*/
public void testRefreshFailedDueToPrimaryTermMisMatch() throws Exception {
int totalAttempt = 1;
int checkpointPublishSucceedOnAttempt = 0;
// We spy on IndexShard.isPrimaryStarted() to validate that we have tried running remote time as per the expectation.
CountDownLatch refreshCountLatch = new CountDownLatch(totalAttempt);

// success latch should change as we would be failed primary term latest validation.
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch reachedCheckpointPublishLatch = new CountDownLatch(0);
Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> tuple = mockIndexShardWithRetryAndScheduleRefresh(
totalAttempt,
refreshCountLatch,
successLatch,
checkpointPublishSucceedOnAttempt,
reachedCheckpointPublishLatch,
false
);

assertBusy(() -> assertEquals(1, tuple.v2().getRemoteSegmentTransferTracker(indexShard.shardId()).getTotalUploadsFailed()));
assertBusy(() -> assertEquals(0, refreshCountLatch.getCount()));
assertBusy(() -> assertEquals(1, successLatch.getCount()));
assertBusy(() -> assertEquals(0, reachedCheckpointPublishLatch.getCount()));
}

private void assertNoLag(RemoteSegmentTransferTracker tracker) {
assertEquals(0, tracker.getRefreshSeqNoLag());
assertEquals(0, tracker.getBytesLag());
Expand Down Expand Up @@ -460,6 +487,24 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch
) throws IOException {
return mockIndexShardWithRetryAndScheduleRefresh(
succeedOnAttempt,
refreshCountLatch,
successLatch,
succeedCheckpointPublishOnAttempt,
reachedCheckpointPublishLatch,
true
);
}

private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIndexShardWithRetryAndScheduleRefresh(
int succeedOnAttempt,
CountDownLatch refreshCountLatch,
CountDownLatch successLatch,
int succeedCheckpointPublishOnAttempt,
CountDownLatch reachedCheckpointPublishLatch,
boolean mockPrimaryTerm
) throws IOException {
// Create index shard that we will be using to mock different methods in IndexShard for the unit test
indexShard = newStartedShard(
Expand Down Expand Up @@ -500,6 +545,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory);

// Mock indexShard.getOperationPrimaryTerm()
if (mockPrimaryTerm) {
when(shard.getOperationPrimaryTerm()).thenReturn(indexShard.getOperationPrimaryTerm());
}
when(shard.getLatestReplicationCheckpoint()).thenReturn(indexShard.getLatestReplicationCheckpoint());

// Mock indexShard.routingEntry().primary()
Expand Down
Loading