diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index a6a12d7ebb4f7..2de25aa1cd6dc 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -67,6 +67,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; +import static org.opensearch.Version.V_2_1_0; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName; @@ -1594,6 +1595,12 @@ private enum OpenSearchExceptionHandle { org.opensearch.transport.NoSeedNodeLeftException::new, 160, LegacyESVersion.V_7_10_0 + ), + REPLICATION_FAILED_EXCEPTION( + org.opensearch.indices.replication.common.ReplicationFailedException.class, + org.opensearch.indices.replication.common.ReplicationFailedException::new, + 161, + V_2_1_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index bad412003df26..d083d947dfbc3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -109,6 +109,7 @@ import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.engine.RefreshFailedEngineException; import org.opensearch.index.engine.SafeCommitInfo; @@ -161,8 +162,8 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1363,6 +1364,20 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + private Optional getReplicationEngine() { + if (getEngine() instanceof NRTReplicationEngine) { + return Optional.of((NRTReplicationEngine) getEngine()); + } else { + return Optional.empty(); + } + } + + public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { + if (getReplicationEngine().isPresent()) { + getReplicationEngine().get().updateSegments(infos, seqNo); + } + } + /** * Snapshots the most recent safe index commit from the currently running engine. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index f818456c3a2c8..2309004c0777d 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; @@ -706,6 +707,51 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } + /** + * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. + * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. + * @param reason the reason for this cleanup operation logged for each deleted file + * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException { + // fetch a snapshot from the latest on disk Segments_N file. This can be behind + // the passed in local in memory snapshot, so we want to ensure files it references are not removed. + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo())); + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + throws IOException { + assert metadataLock.isWriteLockedByCurrentThread(); + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || localSnapshot.contains(existingFile) + || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 1735bb015c90c..426409f7a5b65 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -77,9 +77,7 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH private static final String RECOVERY_PREFIX = "recovery."; private final DiscoveryNode sourceNode; - private final CancellableThreads cancellableThreads; protected final MultiFileWriter multiFileWriter; - protected final Store store; // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); @@ -93,13 +91,10 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH */ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) { super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); - this.cancellableThreads = new CancellableThreads(); this.sourceNode = sourceNode; indexShard.recoveryStats().incCurrentAsTarget(); - this.store = indexShard.store(); final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); - store.incRef(); } /** @@ -132,11 +127,6 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public Store store() { - ensureRefCount(); - return store; - } - public String description() { return "recovery from " + source(); } @@ -258,14 +248,6 @@ protected void onDone() { indexShard.postRecovery("peer recovery done"); } - /** - * if {@link #cancellableThreads()} was used, the threads will be interrupted. - */ - @Override - protected void onCancel(String reason) { - cancellableThreads.cancel(reason); - } - /*** Implementation of {@link RecoveryTargetHandler } */ @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index b01016d2a1e62..838c06a4785ef 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -27,7 +27,9 @@ public class SegmentReplicationState implements ReplicationState { public enum Stage { DONE((byte) 0), - INIT((byte) 1); + INIT((byte) 1), + + REPLICATING((byte) 2); private static final Stage[] STAGES = new Stage[Stage.values().length]; @@ -56,29 +58,58 @@ public static Stage fromId(byte id) { } } - public SegmentReplicationState() { - this.stage = Stage.INIT; - } - private Stage stage; + private final ReplicationLuceneIndex index; + private final ReplicationTimer timer; + + public SegmentReplicationState(ReplicationLuceneIndex index) { + stage = Stage.INIT; + this.index = index; + timer = new ReplicationTimer(); + timer.start(); + } @Override public ReplicationLuceneIndex getIndex() { - // TODO - return null; + return index; } @Override public ReplicationTimer getTimer() { - // TODO - return null; + return timer; } public Stage getStage() { return stage; } + protected void validateAndSetStage(Stage expected, Stage next) { + if (stage != expected) { + assert false : "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"; + throw new IllegalStateException( + "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])" + ); + } + stage = next; + } + public void setStage(Stage stage) { - this.stage = stage; + switch (stage) { + case INIT: + this.stage = Stage.INIT; + getIndex().reset(); + break; + case REPLICATING: + validateAndSetStage(Stage.INIT, stage); + getIndex().start(); + break; + case DONE: + validateAndSetStage(Stage.REPLICATING, stage); + getIndex().stop(); + getTimer().stop(); + break; + default: + throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]"); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7933ea5f0344b..fb68e59f3b2ef 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,18 +8,40 @@ package org.opensearch.indices.replication; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.recovery.MultiFileWriter; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; -import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; /** * Represents the target of a replication event. @@ -31,55 +53,52 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final ReplicationCheckpoint checkpoint; private final SegmentReplicationSource source; private final SegmentReplicationState state; + protected final MultiFileWriter multiFileWriter; public SegmentReplicationTarget( ReplicationCheckpoint checkpoint, IndexShard indexShard, SegmentReplicationSource source, - SegmentReplicationTargetService.SegmentReplicationListener listener + ReplicationListener listener ) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = checkpoint; this.source = source; - this.state = new SegmentReplicationState(); + this.state = new SegmentReplicationState(stateIndex); + this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); } @Override protected void closeInternal() { - // TODO + try { + multiFileWriter.close(); + } finally { + super.closeInternal(); + } } @Override protected String getPrefix() { - // TODO - return null; + return "replication." + UUIDs.randomBase64UUID() + "."; } @Override protected void onDone() { - this.state.setStage(SegmentReplicationState.Stage.DONE); + state.setStage(SegmentReplicationState.Stage.DONE); } @Override - protected void onCancel(String reason) { - // TODO - } - - @Override - public ReplicationState state() { + public SegmentReplicationState state() { return state; } - @Override - public ReplicationTarget retryCopy() { - // TODO - return null; + public SegmentReplicationTarget retryCopy() { + return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); } @Override public String description() { - // TODO - return null; + return "Segment replication from " + source.toString(); } @Override @@ -102,7 +121,12 @@ public void writeFileChunk( int totalTranslogOps, ActionListener listener ) { - // TODO + try { + multiFileWriter.writeFileChunk(metadata, position, content, lastChunk); + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } } /** @@ -110,6 +134,127 @@ public void writeFileChunk( * @param listener {@link ActionListener} listener. */ public void startReplication(ActionListener listener) { - // TODO + state.setStage(SegmentReplicationState.Stage.REPLICATING); + final StepListener checkpointInfoListener = new StepListener<>(); + final StepListener getFilesListener = new StepListener<>(); + final StepListener finalizeListener = new StepListener<>(); + + // Get list of files to copy from this checkpoint. + source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + + checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure); + getFilesListener.whenComplete( + response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), + listener::onFailure + ); + finalizeListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); + } + + private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getFilesListener) + throws IOException { + final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); + Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); + final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); + logger.debug("Replication diff {}", diff); + // Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming snapshot + // from + // source that means the local copy of the segment has been corrupted/changed in some way and we throw an IllegalStateException to + // fail the shard + if (diff.different.isEmpty() == false) { + getFilesListener.onFailure( + new IllegalStateException( + new ParameterizedMessage("Shard {} has local copies of segments that differ from the primary", indexShard.shardId()) + .getFormattedMessage() + ) + ); + } + final List filesToFetch = new ArrayList(diff.missing); + + Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); + final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() + .stream() + .filter(f -> storeFiles.contains(f.name()) == false) + .collect(Collectors.toSet()); + + filesToFetch.addAll(pendingDeleteFiles); + + for (StoreFileMetadata file : filesToFetch) { + state.getIndex().addFileDetail(file.name(), file.length(), false); + } + if (filesToFetch.isEmpty()) { + getFilesListener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } else { + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener); + } + } + + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + multiFileWriter.renameAllTempFiles(); + final Store store = store(); + store.incRef(); + try { + // Deserialize the new SegmentInfos object sent from the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + try { + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); + fail(rfe, true); + throw rfe; + } catch (Exception ex) { + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); + fail(rfe, true); + throw rfe; + } finally { + store.decRef(); + } + return null; + }); + } + + /** + * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput( + new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") + ); + } + + Store.MetadataSnapshot getMetadataSnapshot() throws IOException { + if (indexShard.getSegmentInfosSnapshot() == null) { + return Store.MetadataSnapshot.EMPTY; + } + return store.getMetadata(indexShard.getSegmentInfosSnapshot().get()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java new file mode 100644 index 0000000000000..afdd0ce466f9b --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +/** + * Exception thrown if replication fails + * + * @opensearch.internal + */ +public class ReplicationFailedException extends OpenSearchException { + + public ReplicationFailedException(IndexShard shard, Throwable cause) { + this(shard, null, cause); + } + + public ReplicationFailedException(IndexShard shard, @Nullable String extraInfo, Throwable cause) { + this(shard.shardId(), extraInfo, cause); + } + + public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, Throwable cause) { + super(shardId + ": Replication failed on " + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause); + } + + public ReplicationFailedException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index f8dc07f122c02..27e23ceafb15e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -23,6 +23,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.RecoveryTransportRequest; @@ -50,6 +51,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected final AtomicBoolean finished = new AtomicBoolean(); private final ShardId shardId; protected final IndexShard indexShard; + protected final Store store; protected final ReplicationListener listener; protected final Logger logger; protected final CancellableThreads cancellableThreads; @@ -59,7 +61,9 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected abstract void onDone(); - protected abstract void onCancel(String reason); + protected void onCancel(String reason) { + cancellableThreads.cancel(reason); + } public abstract ReplicationState state(); @@ -84,9 +88,11 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn this.id = ID_GENERATOR.incrementAndGet(); this.stateIndex = stateIndex; this.indexShard = indexShard; + this.store = indexShard.store(); this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); + store.incRef(); } public long getId() { @@ -119,6 +125,11 @@ public IndexShard indexShard() { return indexShard; } + public Store store() { + ensureRefCount(); + return store; + } + public ShardId shardId() { return shardId; } @@ -266,4 +277,8 @@ public abstract void writeFileChunk( int totalTranslogOps, ActionListener listener ); + + protected void closeInternal() { + store.decRef(); + } } diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index 888e855176fe6..2046834fa9585 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -84,6 +84,7 @@ import org.opensearch.indices.InvalidIndexTemplateException; import org.opensearch.indices.recovery.PeerRecoveryNotFound; import org.opensearch.indices.recovery.RecoverFilesRecoveryException; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.ingest.IngestProcessorException; import org.opensearch.cluster.coordination.NodeHealthCheckFailureException; import org.opensearch.repositories.RepositoryException; @@ -849,6 +850,7 @@ public void testIds() { ids.put(158, PeerRecoveryNotFound.class); ids.put(159, NodeHealthCheckFailureException.class); ids.put(160, NoSeedNodeLeftException.class); + ids.put(161, ReplicationFailedException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index d99bde4764adf..b6bced9f038c0 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -31,7 +31,6 @@ package org.opensearch.index.store; -import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -51,7 +50,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -60,9 +58,12 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NIOFSDirectory; -import org.apache.lucene.util.BytesRef; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; +import org.hamcrest.Matchers; import org.opensearch.ExceptionsHelper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; @@ -81,9 +82,8 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.test.DummyShardLock; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; -import org.hamcrest.Matchers; +import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -102,7 +102,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.unmodifiableMap; -import static org.opensearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -114,6 +113,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.opensearch.test.VersionUtils.randomVersion; public class StoreTests extends OpenSearchTestCase { @@ -1149,4 +1149,43 @@ public void testGetMetadataWithSegmentInfos() throws IOException { assertEquals(segmentInfos.getSegmentsFileName(), metadataSnapshot.getSegmentsFile().name()); store.close(); } + + public void testcleanupAndPreserveLatestCommitPoint() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( + TestUtil.getDefaultCodec() + ); + indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig); + int docs = 1 + random().nextInt(100); + writer.commit(); + Document doc = new Document(); + doc.add(new TextField("id", "" + docs++, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add( + new TextField( + "body", + TestUtil.randomRealisticUnicodeString(random()), + random().nextBoolean() ? Field.Store.YES : Field.Store.NO + ) + ); + doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + writer.addDocument(doc); + writer.commit(); + writer.close(); + + Store.MetadataSnapshot commitMetadata = store.getMetadata(); + + Store.MetadataSnapshot refreshMetadata = Store.MetadataSnapshot.EMPTY; + + store.cleanupAndPreserveLatestCommitPoint("test", refreshMetadata); + + // we want to ensure commitMetadata files are preserved after calling cleanup + for (String existingFile : store.directory().listAll()) { + assert (commitMetadata.contains(existingFile) == true); + } + + deleteContent(store.directory()); + IOUtils.close(store); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index aa17dec5767da..06b16b797efe3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -23,7 +23,10 @@ import java.io.IOException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -73,6 +76,8 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { + // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onResponse(null); return null; @@ -95,7 +100,7 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(SegmentReplicationState.Stage.REPLICATING, state.getStage()); assertEquals(expectedError, e.getCause()); assertTrue(sendShardFailure); } @@ -103,6 +108,8 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { + // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onFailure(expectedError); return null; diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java new file mode 100644 index 0000000000000..a0944ee249859 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -0,0 +1,370 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.util.Version; +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class SegmentReplicationTargetTests extends IndexShardTestCase { + + private SegmentReplicationTarget segrepTarget; + private IndexShard indexShard, spyIndexShard; + private ReplicationCheckpoint repCheckpoint; + private ByteBuffersDataOutput buffer; + + private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata( + IndexFileNames.SEGMENTS, + 5L, + "different", + Version.LATEST + ); + private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + + private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + null, + 0 + ); + + private static final Store.MetadataSnapshot SI_SNAPSHOT_DIFFERENT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE_DIFF.name(), SEGMENTS_FILE_DIFF), + null, + 0 + ); + + SegmentInfos testSegmentInfos; + + @Override + public void setUp() throws Exception { + + super.setUp(); + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + spyIndexShard = spy(indexShard); + + Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), anyLong()); + testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo(); + buffer = new ByteBuffersDataOutput(); + try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { + testSegmentInfos.write(indexOutput); + } + repCheckpoint = new ReplicationCheckpoint( + spyIndexShard.shardId(), + spyIndexShard.getPendingPrimaryTerm(), + testSegmentInfos.getGeneration(), + spyIndexShard.seqNoStats().getLocalCheckpoint(), + testSegmentInfos.version + ); + } + + public void testSuccessfulResponse_startReplication() { + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + assertEquals(filesToFetch.size(), 2); + assert (filesToFetch.contains(SEGMENTS_FILE)); + assert (filesToFetch.contains(PENDING_DELETE_FILE)); + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + try { + verify(spyIndexShard, times(1)).finalizeReplication(any(), anyLong()); + } catch (IOException ex) { + Assert.fail(); + } + } + + @Override + public void onFailure(Exception e) { + logger.error("Unexpected test error", e); + Assert.fail(); + } + }); + } + + public void testFailureResponse_getCheckpointMetadata() { + + Exception exception = new Exception("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onFailure(exception); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause().getCause()); + } + }); + } + + public void testFailureResponse_getSegmentFiles() { + + Exception exception = new Exception("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onFailure(exception); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause().getCause()); + } + }); + } + + public void testFailure_finalizeReplication_IOException() throws IOException { + + IOException exception = new IOException("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause()); + } + }); + } + + public void testFailure_finalizeReplication_IndexFormatException() throws IOException { + + IndexFormatTooNewException exception = new IndexFormatTooNewException("string", 1, 2, 1); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause()); + } + }); + } + + public void testFailure_differentSegmentFiles() throws IOException { + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); + when(segrepTarget.getMetadataSnapshot()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assert (e instanceof IllegalStateException); + } + }); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + segrepTarget.markAsDone(); + closeShards(spyIndexShard, indexShard); + } +}