Skip to content

Commit

Permalink
Add replication orchestration for a single shard (opensearch-project#…
Browse files Browse the repository at this point in the history
…3533)

* implement segment replication target

Signed-off-by: Poojita Raj <[email protected]>

* test added

Signed-off-by: Poojita Raj <[email protected]>

* changes to tests + finalizeReplication

Signed-off-by: Poojita Raj <[email protected]>

* fix style check

Signed-off-by: Poojita Raj <[email protected]>

* addressing comments + fix gradle check

Signed-off-by: Poojita Raj <[email protected]>

* added test + addressed review comments

Signed-off-by: Poojita Raj <[email protected]>
  • Loading branch information
Poojita-Raj authored Jun 22, 2022
1 parent 60d7a09 commit 3d4d5ca
Show file tree
Hide file tree
Showing 12 changed files with 759 additions and 59 deletions.
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/OpenSearchException.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.Version.V_2_1_0;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName;
Expand Down Expand Up @@ -1594,6 +1595,12 @@ private enum OpenSearchExceptionHandle {
org.opensearch.transport.NoSeedNodeLeftException::new,
160,
LegacyESVersion.V_7_10_0
),
REPLICATION_FAILED_EXCEPTION(
org.opensearch.indices.replication.common.ReplicationFailedException.class,
org.opensearch.indices.replication.common.ReplicationFailedException::new,
161,
V_2_1_0
);

final Class<? extends OpenSearchException> exceptionClass;
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import org.opensearch.index.engine.EngineConfigFactory;
import org.opensearch.index.engine.EngineException;
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.engine.NRTReplicationEngine;
import org.opensearch.index.engine.ReadOnlyEngine;
import org.opensearch.index.engine.RefreshFailedEngineException;
import org.opensearch.index.engine.SafeCommitInfo;
Expand Down Expand Up @@ -161,8 +162,8 @@
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.rest.RestStatus;
Expand Down Expand Up @@ -1363,6 +1364,20 @@ public GatedCloseable<IndexCommit> acquireLastIndexCommit(boolean flushFirst) th
}
}

private Optional<NRTReplicationEngine> getReplicationEngine() {
if (getEngine() instanceof NRTReplicationEngine) {
return Optional.of((NRTReplicationEngine) getEngine());
} else {
return Optional.empty();
}
}

public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException {
if (getReplicationEngine().isPresent()) {
getReplicationEngine().get().updateSegments(infos, seqNo);
}
}

/**
* Snapshots the most recent safe index commit from the currently running engine.
* All index files referenced by this index commit won't be freed until the commit/snapshot is closed.
Expand Down
46 changes: 46 additions & 0 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.Nullable;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
Expand Down Expand Up @@ -706,6 +707,51 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}
}

/**
* This method deletes every file in this store that is not contained in either the remote or local metadata snapshots.
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param localSnapshot The local snapshot from in memory SegmentInfos.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException {
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo()));
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot)
throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
}
try {
directory.deleteFile(reason, existingFile);
} catch (IOException ex) {
if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) {
// TODO do we need to also fail this if we can't delete the pending commit file?
// if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit
// point around?
throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex);
}
logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex);
// ignore, we don't really care, will get deleted later on
}
}
}

// pkg private for testing
final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) {
final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
private static final String RECOVERY_PREFIX = "recovery.";

private final DiscoveryNode sourceNode;
private final CancellableThreads cancellableThreads;
protected final MultiFileWriter multiFileWriter;
protected final Store store;

// latch that can be used to blockingly wait for RecoveryTarget to be closed
private final CountDownLatch closedLatch = new CountDownLatch(1);
Expand All @@ -93,13 +91,10 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH
*/
public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) {
super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener);
this.cancellableThreads = new CancellableThreads();
this.sourceNode = sourceNode;
indexShard.recoveryStats().incCurrentAsTarget();
this.store = indexShard.store();
final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + ".";
this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount);
store.incRef();
}

/**
Expand Down Expand Up @@ -132,11 +127,6 @@ public CancellableThreads cancellableThreads() {
return cancellableThreads;
}

public Store store() {
ensureRefCount();
return store;
}

public String description() {
return "recovery from " + source();
}
Expand Down Expand Up @@ -258,14 +248,6 @@ protected void onDone() {
indexShard.postRecovery("peer recovery done");
}

/**
* if {@link #cancellableThreads()} was used, the threads will be interrupted.
*/
@Override
protected void onCancel(String reason) {
cancellableThreads.cancel(reason);
}

/*** Implementation of {@link RecoveryTargetHandler } */

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ public class SegmentReplicationState implements ReplicationState {
public enum Stage {
DONE((byte) 0),

INIT((byte) 1);
INIT((byte) 1),

REPLICATING((byte) 2);

private static final Stage[] STAGES = new Stage[Stage.values().length];

Expand Down Expand Up @@ -56,29 +58,58 @@ public static Stage fromId(byte id) {
}
}

public SegmentReplicationState() {
this.stage = Stage.INIT;
}

private Stage stage;
private final ReplicationLuceneIndex index;
private final ReplicationTimer timer;

public SegmentReplicationState(ReplicationLuceneIndex index) {
stage = Stage.INIT;
this.index = index;
timer = new ReplicationTimer();
timer.start();
}

@Override
public ReplicationLuceneIndex getIndex() {
// TODO
return null;
return index;
}

@Override
public ReplicationTimer getTimer() {
// TODO
return null;
return timer;
}

public Stage getStage() {
return stage;
}

protected void validateAndSetStage(Stage expected, Stage next) {
if (stage != expected) {
assert false : "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])";
throw new IllegalStateException(
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
);
}
stage = next;
}

public void setStage(Stage stage) {
this.stage = stage;
switch (stage) {
case INIT:
this.stage = Stage.INIT;
getIndex().reset();
break;
case REPLICATING:
validateAndSetStage(Stage.INIT, stage);
getIndex().start();
break;
case DONE:
validateAndSetStage(Stage.REPLICATING, stage);
getIndex().stop();
getTimer().stop();
break;
default:
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
}
}
}
Loading

0 comments on commit 3d4d5ca

Please sign in to comment.