Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Ensure replica's store always contains the previous commit point. #2551

Merged
merged 3 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@

package org.opensearch.indices.replication;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
Expand All @@ -23,17 +26,21 @@ public class SegmentReplicationIT extends OpenSearchIntegTestCase {
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build();
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(
INDEX_NAME,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexMetadata.SETTING_SEGMENT_REPLICATION, true)
.build()
);
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
Expand All @@ -56,4 +63,64 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), initialDocCount)) {
waitForDocs(initialDocCount, indexer);
}
flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
mch2 marked this conversation as resolved.
Show resolved Hide resolved
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int totalDocs = initialDocCount + additionalDocCount;
try (BackgroundIndexer indexer = new BackgroundIndexer(INDEX_NAME, "_doc", client(), additionalDocCount)) {
waitForDocs(additionalDocCount, indexer);
}
// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that replicas preserve these files so the local store is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs);
}

public void testReplicaSetupAfterPrimaryIndexesDocs() throws Exception {
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

// Index a doc to create the first set of segments. _s1.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
// Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si)
flushAndRefresh(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// Index to create another segment
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that we are still sending these older segments to replicas so the index on disk is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);

final String nodeB = internalCluster().startNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {};
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {}

public long getProcessedLocalCheckpoint() {
return 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -119,7 +115,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -341,21 +336,13 @@ public InternalEngine(EngineConfig engineConfig) {
}

@Override
public void updateCurrentInfos(byte[] infosBytes, long gen, long seqNo) throws IOException {
assert engineConfig.isReadOnly() == true : "Only read-only replicas should update Infos";
SegmentInfos infos = SegmentInfos.readCommit(this.store.directory(), toIndexInput(infosBytes), gen);
assert gen == infos.getGeneration();
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
assert engineConfig.isReadOnly() : "Only replicas should update Infos";
externalReaderManager.internalReaderManager.updateSegments(infos);
externalReaderManager.maybeRefresh();
localCheckpointTracker.markSeqNoAsProcessed(seqNo);
}

private ChecksumIndexInput toIndexInput(byte[] input) {
return new BufferedChecksumIndexInput(
new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos")
);
}

private LocalCheckpointTracker createLocalCheckpointTracker(
BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier
) throws IOException {
Expand Down
39 changes: 15 additions & 24 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@
import org.opensearch.indices.replication.copy.ReplicationCheckpoint;
import org.opensearch.indices.replication.copy.ReplicationFailedException;
import org.opensearch.indices.replication.copy.SegmentReplicationState;
import org.opensearch.indices.replication.copy.TrackShardResponse;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1349,6 +1348,10 @@ public void rollTranslogGeneration() {
}

public void forceMerge(ForceMergeRequest forceMerge) throws IOException {
if (indexSettings.isSegrepEnabled() && shardRouting.primary() == false) {
// With segment replication enabled, replicas do not perform this operation.
return;
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}
verifyActive();
if (logger.isTraceEnabled()) {
logger.trace("force merge with {}", forceMerge);
Expand Down Expand Up @@ -1444,8 +1447,8 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
);
}

public void updateCurrentInfos(long gen, byte[] infosBytes, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infosBytes, gen, seqNo);
public void updateCurrentInfos(SegmentInfos infos, long seqNo) throws IOException {
getEngine().updateCurrentInfos(infos, seqNo);
kartg marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down Expand Up @@ -1952,6 +1955,9 @@ public void openEngineAndSkipTranslogRecovery() throws IOException {
// TODO: Segrep - fix initial recovery stages from ReplicationTarget.
if (indexSettings.isSegrepEnabled() == false) {
recoveryState.validateCurrentStage(RecoveryState.Stage.TRANSLOG);
} else {
recoveryState.setStage(RecoveryState.Stage.VERIFY_INDEX);
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
kartg marked this conversation as resolved.
Show resolved Hide resolved
}
loadGlobalCheckpointToReplicationTracker();
innerOpenEngineAndTranslog(replicationTracker);
Expand Down Expand Up @@ -3011,25 +3017,14 @@ public void startRecovery(
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
if (indexSettings.isSegrepEnabled()) {
IndexShard indexShard = this;
segmentReplicationReplicaService.prepareForReplication(
// Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary
// and started with the latest set of segments.
segmentReplicationReplicaService.startRecovery(
mch2 marked this conversation as resolved.
Show resolved Hide resolved
this,
recoveryState.getTargetNode(),
recoveryState.getSourceNode(),
new ActionListener<TrackShardResponse>() {
@Override
public void onResponse(TrackShardResponse unused) {
segRepListener.onReplicationDone(segRepState);
recoveryState.getIndex().setFileDetailsComplete();
finalizeRecovery();
postRecovery("Shard setup complete.");
}

@Override
public void onFailure(Exception e) {
segRepListener.onReplicationFailure(segRepState, new ReplicationFailedException(indexShard, e), true);
}
}
replicationSource,
segRepListener
);
} else {
peerRecoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
Expand Down Expand Up @@ -3669,22 +3664,19 @@ public synchronized void onNewCheckpoint(
return;
}
if (isReplicating()) {
logger.info("Ignore - shard is currently replicating to a checkpoint");
logger.debug("Ignore - shard is currently replicating to a checkpoint");
return;
}
try {
markAsReplicating();
final ReplicationCheckpoint checkpoint = request.getCheckpoint();
logger.trace("Received new checkpoint {}", checkpoint);
// TODO: segrep - these are the states set after we perform our initial store recovery.
segmentReplicationReplicaService.startReplication(
checkpoint,
this,
source,
new SegmentReplicationReplicaService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
markReplicationComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we remove this when we continue to use MarkAsReplicating()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this to when ReplicationTarget's finalize method completes because I didn't like SegmentReplicationListener being responsible for marking it as complete. With that said I think we can do better by moving it to the onDone/onCancel/onFail methods of SegmentReplicationTarget. Will also move markAsReplicating to ReplicationTarget as well so these classes don't have to manage that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaned this up in latest commit, what do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the plan to move markAsReplicating to ReplicationTarget as well? Right now, that seems to be in ReplicaService so the lifecycle management is distributed across two classes, which smells. How do we improve this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe we can completely remove this state and make a synchronized check if ReplicationCollection has an ongoing replication for the shard. Would like to take this as a follow up change if thats ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create a task to track it, so we don't forget?

logger.debug("Replication complete to {}", getLatestReplicationCheckpoint());
}

Expand All @@ -3694,7 +3686,6 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
markReplicationComplete();
logger.error("Failure", e);
}
}
Expand Down
71 changes: 51 additions & 20 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -672,26 +673,7 @@ private static void failIfCorrupted(Directory directory) throws IOException {
public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) throws IOException {
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile) || sourceMetadata.contains(existingFile)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
cleanupFiles(reason, sourceMetadata, null);
directory.syncMetaData();
final Store.MetadataSnapshot metadataOrEmpty = getMetadata((IndexCommit) null);
verifyAfterCleanup(sourceMetadata, metadataOrEmpty);
Expand All @@ -700,6 +682,55 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param remoteSnapshot The remote snapshot sent from primary shards.
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndVerify(String reason, MetadataSnapshot remoteSnapshot, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
mch2 marked this conversation as resolved.
Show resolved Hide resolved
final Store.MetadataSnapshot latestCommitPointMetadata = getMetadata((IndexCommit) null);
cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata);
verifyAfterCleanup(remoteSnapshot, localSnapshot);
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| remoteSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
// FNF should not happen since we hold a write lock?
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ private ShardRoutingReplicationListener(final ShardRouting shardRouting, final l

@Override
public void onReplicationDone(final SegmentReplicationState state) {
logger.info("Shard setup complete, ready for segment copy.");
logger.trace("Shard setup complete, ready for segment copy.");
shardStateAction.shardStarted(shardRouting, primaryTerm, "after replication", SHARD_STATE_ACTION_LISTENER);
}

Expand Down
Loading