From f6cb0925bc69a51dbcfdcc826718bd916a0d9a80 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Fri, 3 Jun 2022 14:14:29 -0700 Subject: [PATCH 1/6] implement segment replication target Signed-off-by: Poojita Raj --- .../org/opensearch/index/engine/Engine.java | 2 + .../index/engine/NRTReplicationEngine.java | 52 ++++++ .../opensearch/index/shard/IndexShard.java | 16 ++ .../org/opensearch/index/store/Store.java | 54 ++++++ .../indices/recovery/RecoveryTarget.java | 10 -- .../replication/SegmentReplicationState.java | 53 ++++-- .../replication/SegmentReplicationTarget.java | 163 ++++++++++++++++-- .../SegmentReplicationTargetService.java | 4 +- .../common/ReplicationFailedException.java | 60 +++++++ .../replication/common/ReplicationTarget.java | 13 +- ...eplicationLocalCheckpointTrackerTests.java | 86 +++++++++ .../SegmentReplicationTargetTests.java | 12 ++ 12 files changed, 488 insertions(+), 37 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java create mode 100644 server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 4829148322b31..bcac0e5d13a7d 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -271,6 +271,8 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } + public void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) throws IOException {} + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index e4f4bbbba8f16..7ec93fcfa094a 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -23,6 +23,7 @@ import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; @@ -480,4 +481,55 @@ private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConf ); } + public synchronized void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) + throws IOException { + assert engineConfig.isReadOnlyReplica() : "Only replicas should update Infos"; + + store.incRef(); + try { + refreshLastCommittedSegmentInfos(); + // clean up the local store of old segment files + // and validate the latest segment infos against the snapshot sent from the primary shard. + store.cleanupAndVerify( + "finalize - clean with in memory infos", + expectedMetadata, + store.getMetadata(infos), + store.getMetadata(lastCommittedSegmentInfos) + ); + } finally { + store.decRef(); + } + // Update the current infos reference on the Engine's reader. + readerManager.updateSegments(infos); + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + readerManager.maybeRefresh(); + } + + private void refreshLastCommittedSegmentInfos() { + /* + * we have to inc-ref the store here since if the engine is closed by a tragic event + * we don't acquire the write lock and wait until we have exclusive access. This might also + * dec the store reference which can essentially close the store and unless we can inc the reference + * we can't use it. + */ + store.incRef(); + try { + // reread the last committed segment infos + lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); + } catch (Exception e) { + if (isClosed.get() == false) { + try { + logger.warn("failed to read latest segment infos on flush", e); + } catch (Exception inner) { + e.addSuppressed(inner); + } + if (Lucene.isCorruptionException(e)) { + throw new FlushFailedEngineException(shardId, e); + } + } + } finally { + store.decRef(); + } + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index bad412003df26..812ce8f518896 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -109,6 +109,7 @@ import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineException; import org.opensearch.index.engine.EngineFactory; +import org.opensearch.index.engine.NRTReplicationEngine; import org.opensearch.index.engine.ReadOnlyEngine; import org.opensearch.index.engine.RefreshFailedEngineException; import org.opensearch.index.engine.SafeCommitInfo; @@ -1363,6 +1364,21 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + public void finalizeReplication(SegmentInfos infos, MetadataSnapshot expectedMetadata, long seqNo) throws IOException { + if (getEngine() instanceof NRTReplicationEngine) { + getEngine().finalizeReplication(infos, expectedMetadata, seqNo); + } + } + + /** + * Fetch a snapshot of the latest SegmentInfos held by the engine. + * @return {@link SegmentInfos} + */ + public SegmentInfos getLatestSegmentInfos() { + // TODO: implement + return null; + } + /** * Snapshots the most recent safe index commit from the currently running engine. * All index files referenced by this index commit won't be freed until the commit/snapshot is closed. diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index f818456c3a2c8..4013fa1e6d013 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -64,6 +64,7 @@ import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.Version; import org.opensearch.ExceptionsHelper; +import org.opensearch.common.Nullable; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; @@ -706,6 +707,59 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr } } + /** + * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. + * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. + * In this case files from both snapshots must be preserved. + * @param reason the reason for this cleanup operation logged for each deleted file + * @param remoteSnapshot The remote snapshot sent from primary shards. + * @param localSnapshot The local snapshot from in memory SegmentInfos. + * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. + */ + public void cleanupAndVerify( + String reason, + MetadataSnapshot remoteSnapshot, + MetadataSnapshot localSnapshot, + MetadataSnapshot latestCommitPointMetadata + ) throws IOException { + // fetch a snapshot from the latest on disk Segments_N file. This can be behind + // the passed in local in memory snapshot, so we want to ensure files it references are not removed. + metadataLock.writeLock().lock(); + try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { + cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); + verifyAfterCleanup(remoteSnapshot, localSnapshot); + } finally { + metadataLock.writeLock().unlock(); + } + } + + private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + throws IOException { + assert metadataLock.isWriteLockedByCurrentThread(); + for (String existingFile : directory.listAll()) { + if (Store.isAutogenerated(existingFile) + || remoteSnapshot.contains(existingFile) + || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { + // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete + // checksum) + continue; + } + try { + directory.deleteFile(reason, existingFile); + // FNF should not happen since we hold a write lock? + } catch (IOException ex) { + if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { + // TODO do we need to also fail this if we can't delete the pending commit file? + // if one of those files can't be deleted we better fail the cleanup otherwise we might leave an old commit + // point around? + throw new IllegalStateException("Can't delete " + existingFile + " - cleanup failed", ex); + } + logger.debug(() -> new ParameterizedMessage("failed to delete file [{}]", existingFile), ex); + // ignore, we don't really care, will get deleted later on + } + } + } + // pkg private for testing final void verifyAfterCleanup(MetadataSnapshot sourceMetadata, MetadataSnapshot targetMetadata) { final RecoveryDiff recoveryDiff = targetMetadata.recoveryDiff(sourceMetadata); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 1735bb015c90c..185434fb94a3c 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -77,7 +77,6 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH private static final String RECOVERY_PREFIX = "recovery."; private final DiscoveryNode sourceNode; - private final CancellableThreads cancellableThreads; protected final MultiFileWriter multiFileWriter; protected final Store store; @@ -93,7 +92,6 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH */ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, ReplicationListener listener) { super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); - this.cancellableThreads = new CancellableThreads(); this.sourceNode = sourceNode; indexShard.recoveryStats().incCurrentAsTarget(); this.store = indexShard.store(); @@ -258,14 +256,6 @@ protected void onDone() { indexShard.postRecovery("peer recovery done"); } - /** - * if {@link #cancellableThreads()} was used, the threads will be interrupted. - */ - @Override - protected void onCancel(String reason) { - cancellableThreads.cancel(reason); - } - /*** Implementation of {@link RecoveryTargetHandler } */ @Override diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index b01016d2a1e62..dff9c2dcec018 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -25,9 +25,11 @@ public class SegmentReplicationState implements ReplicationState { * @opensearch.internal */ public enum Stage { - DONE((byte) 0), + INIT((byte) 1), - INIT((byte) 1); + REPLICATING((byte) 1), + + DONE((byte) 2); private static final Stage[] STAGES = new Stage[Stage.values().length]; @@ -56,29 +58,58 @@ public static Stage fromId(byte id) { } } - public SegmentReplicationState() { - this.stage = Stage.INIT; - } - private Stage stage; + private final ReplicationLuceneIndex index; + private final ReplicationTimer timer; + + public SegmentReplicationState(ReplicationLuceneIndex index) { + stage = Stage.DONE; + this.index = index; + timer = new ReplicationTimer(); + timer.start(); + } @Override public ReplicationLuceneIndex getIndex() { - // TODO - return null; + return index; } @Override public ReplicationTimer getTimer() { - // TODO - return null; + return timer; } public Stage getStage() { return stage; } + protected void validateAndSetStage(Stage expected, Stage next) { + if (stage != expected) { + assert false : "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"; + throw new IllegalStateException( + "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])" + ); + } + stage = next; + } + public void setStage(Stage stage) { - this.stage = stage; + switch (stage) { + case INIT: + this.stage = Stage.INIT; + getIndex().reset(); + break; + case REPLICATING: + validateAndSetStage(Stage.INIT, stage); + getIndex().start(); + break; + case DONE: + validateAndSetStage(Stage.REPLICATING, stage); + getIndex().stop(); + getTimer().stop(); + break; + default: + throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]"); + } } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 7933ea5f0344b..00cb8a34184fe 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,18 +8,40 @@ package org.opensearch.indices.replication; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.BufferedChecksumIndexInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersIndexInput; +import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.StepListener; +import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardState; +import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.recovery.MultiFileWriter; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; -import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationTarget; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Represents the target of a replication event. @@ -31,6 +53,7 @@ public class SegmentReplicationTarget extends ReplicationTarget { private final ReplicationCheckpoint checkpoint; private final SegmentReplicationSource source; private final SegmentReplicationState state; + protected final MultiFileWriter multiFileWriter; public SegmentReplicationTarget( ReplicationCheckpoint checkpoint, @@ -41,32 +64,38 @@ public SegmentReplicationTarget( super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = checkpoint; this.source = source; - this.state = new SegmentReplicationState(); + this.state = new SegmentReplicationState(stateIndex); + this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, getPrefix(), logger, this::ensureRefCount); } @Override protected void closeInternal() { - // TODO + try { + multiFileWriter.close(); + } finally { + // free store. increment happens in constructor + store.decRef(); + } } @Override protected String getPrefix() { - // TODO - return null; + return "replication." + UUIDs.randomBase64UUID() + "."; } @Override protected void onDone() { - this.state.setStage(SegmentReplicationState.Stage.DONE); + state.setStage(SegmentReplicationState.Stage.DONE); } @Override - protected void onCancel(String reason) { - // TODO + public void onCancel(String reason) { + state.setStage(SegmentReplicationState.Stage.DONE); + cancellableThreads.cancel(reason); } @Override - public ReplicationState state() { + public SegmentReplicationState state() { return state; } @@ -78,8 +107,7 @@ public ReplicationTarget retryCopy() { @Override public String description() { - // TODO - return null; + return "Segment replication from " + source.toString(); } @Override @@ -102,14 +130,121 @@ public void writeFileChunk( int totalTranslogOps, ActionListener listener ) { - // TODO + try { + multiFileWriter.writeFileChunk(metadata, position, content, lastChunk); + listener.onResponse(null); + } catch (Exception e) { + listener.onFailure(e); + } } /** * Start the Replication event. * @param listener {@link ActionListener} listener. */ - public void startReplication(ActionListener listener) { - // TODO + public void startReplication(ActionListener listener) { + final StepListener checkpointInfoListener = new StepListener<>(); + final StepListener getFilesListener = new StepListener<>(); + final StepListener finalizeListener = new StepListener<>(); + + // Get list of files to copy from this checkpoint. + source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener); + + checkpointInfoListener.whenComplete(checkpointInfo -> getFiles(checkpointInfo, getFilesListener), listener::onFailure); + getFilesListener.whenComplete( + response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), + listener::onFailure + ); + finalizeListener.whenComplete(r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); + } + + private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getFilesListener) + throws IOException { + final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot(); + Store.MetadataSnapshot localMetadata = getMetadataSnapshot(); + final Store.RecoveryDiff diff = snapshot.recoveryDiff(localMetadata); + logger.debug("Recovery diff {}", diff); + final List filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) + .collect(Collectors.toList()); + + Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); + final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() + .stream() + .filter(f -> storeFiles.contains(f.name()) == false) + .collect(Collectors.toSet()); + + filesToFetch.addAll(pendingDeleteFiles); + + for (StoreFileMetadata file : filesToFetch) { + state.getIndex().addFileDetail(file.name(), file.length(), false); + } + if (filesToFetch.isEmpty()) { + getFilesListener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener); + } + + private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + // first, we go and move files that were created with the recovery id suffix to + // the actual names, its ok if we have a corrupted index here, since we have replicas + // to recover from in case of a full cluster shutdown just when this code executes... + multiFileWriter.renameAllTempFiles(); + final Store store = store(); + store.incRef(); + try { + // Deserialize the new SegmentInfos object sent from the primary. + final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + SegmentInfos infos = SegmentInfos.readCommit( + store.directory(), + toIndexInput(checkpointInfoResponse.getInfosBytes()), + responseCheckpoint.getSegmentsGen() + ); + indexShard.finalizeReplication(infos, checkpointInfoResponse.getSnapshot(), responseCheckpoint.getSeqNo()); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + try { + try { + store.removeCorruptionMarker(); + } finally { + Lucene.cleanLuceneIndex(store.directory()); // clean up and delete all files + } + } catch (Exception e) { + logger.debug("Failed to clean lucene index", e); + ex.addSuppressed(e); + } + ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after recovery", ex); + fail(rfe, true); + throw rfe; + } catch (Exception ex) { + ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after recovery", ex); + fail(rfe, true); + throw rfe; + } finally { + store.decRef(); + } + return null; + }); + } + + /** + * This method formats our byte[] containing the primary's SegmentInfos into lucene's {@link ChecksumIndexInput} that can be + * passed to SegmentInfos.readCommit + */ + private ChecksumIndexInput toIndexInput(byte[] input) { + return new BufferedChecksumIndexInput( + new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(ByteBuffer.wrap(input))), "SegmentInfos") + ); + } + + private Store.MetadataSnapshot getMetadataSnapshot() throws IOException { + if (indexShard.state().equals(IndexShardState.STARTED) == false) { + return Store.MetadataSnapshot.EMPTY; + } + return store.getMetadata(indexShard.getLatestSegmentInfos()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 1c6053a72a4c5..32f67719b4300 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -13,6 +13,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; +import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; @@ -93,6 +94,7 @@ public void startReplication( } public void startReplication(final SegmentReplicationTarget target) { + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout()); logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId)); threadPool.generic().execute(new ReplicationRunner(replicationId)); @@ -141,7 +143,7 @@ private void start(final long replicationId) { try (ReplicationRef replicationRef = onGoingReplications.get(replicationId)) { replicationRef.get().startReplication(new ActionListener<>() { @Override - public void onResponse(Void o) { + public void onResponse(ReplicationResponse o) { onGoingReplications.markAsDone(replicationId); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java new file mode 100644 index 0000000000000..15b4e43ba76d0 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices.replication.common; + +import org.opensearch.OpenSearchException; +import org.opensearch.common.Nullable; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.ShardId; + +import java.io.IOException; + +public class ReplicationFailedException extends OpenSearchException { + + public ReplicationFailedException(IndexShard shard, Throwable cause) { + this(shard, null, cause); + } + + public ReplicationFailedException(IndexShard shard, @Nullable String extraInfo, Throwable cause) { + this(shard.shardId(), extraInfo, cause); + } + + public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, Throwable cause) { + super(shardId + ": Replication failed " + ("on ") + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause); + } + + public ReplicationFailedException(StreamInput in) throws IOException { + super(in); + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index f8dc07f122c02..1aabe693c8937 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -23,6 +23,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.recovery.RecoveryTransportRequest; @@ -50,6 +51,7 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected final AtomicBoolean finished = new AtomicBoolean(); private final ShardId shardId; protected final IndexShard indexShard; + protected final Store store; protected final ReplicationListener listener; protected final Logger logger; protected final CancellableThreads cancellableThreads; @@ -59,7 +61,9 @@ public abstract class ReplicationTarget extends AbstractRefCounted { protected abstract void onDone(); - protected abstract void onCancel(String reason); + protected void onCancel(String reason) { + cancellableThreads.cancel(reason); + } public abstract ReplicationState state(); @@ -84,9 +88,11 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn this.id = ID_GENERATOR.incrementAndGet(); this.stateIndex = stateIndex; this.indexShard = indexShard; + this.store = indexShard.store(); this.shardId = indexShard.shardId(); // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); + store.incRef(); } public long getId() { @@ -119,6 +125,11 @@ public IndexShard indexShard() { return indexShard; } + public Store store() { + ensureRefCount(); + return store; + } + public ShardId shardId() { return shardId; } diff --git a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java new file mode 100644 index 0000000000000..0321ea23b1438 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.seqno; + +import org.junit.Before; +import org.opensearch.test.OpenSearchTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase { + + private LocalCheckpointTracker tracker; + + public static LocalCheckpointTracker createEmptyTracker() { + return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + tracker = createEmptyTracker(); + } + + public void testSimpleSegrepProcessedNoPersistentUpdate() { + // base case with no persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + } + + public void testSimpleSegrepProcessedPersistentUpdate() { + // base case with persistent checkpoint update + long seqNo1; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + + tracker.markSeqNoAsPersisted(seqNo1); + assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + // idempotent case + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); + assertThat(tracker.hasProcessed(0L), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); + + } + + public void testSimpleSegrepProcessedPersistentUpdate2() { + long seqNo1, seqNo2; + assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); + seqNo1 = tracker.generateSeqNo(); + seqNo2 = tracker.generateSeqNo(); + assertThat(seqNo1, equalTo(0L)); + assertThat(seqNo2, equalTo(1L)); + tracker.markSeqNoAsPersisted(seqNo1); + tracker.markSeqNoAsPersisted(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); + assertThat(tracker.getPersistedCheckpoint(), equalTo(1L)); + + tracker.fastForwardProcessedSeqNo(seqNo2); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); + assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); + + tracker.fastForwardProcessedSeqNo(seqNo1); + assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); + assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true)); + assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false)); + assertThat(tracker.getMaxSeqNo(), equalTo(1L)); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java new file mode 100644 index 0000000000000..1ab01e1254816 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +public class SegmentReplicationTargetTests { +} From 704a1094499990c11c334b2b00ec35abac3de679 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 7 Jun 2022 15:18:38 -0700 Subject: [PATCH 2/6] test added Signed-off-by: Poojita Raj --- .../indices/recovery/RecoveryTarget.java | 8 - .../replication/SegmentReplicationState.java | 2 +- .../replication/SegmentReplicationTarget.java | 14 +- .../SegmentReplicationTargetService.java | 4 +- .../SegmentReplicationTargetServiceTests.java | 11 +- .../SegmentReplicationTargetTests.java | 246 +++++++++++++++++- 6 files changed, 262 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 185434fb94a3c..426409f7a5b65 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -78,7 +78,6 @@ public class RecoveryTarget extends ReplicationTarget implements RecoveryTargetH private final DiscoveryNode sourceNode; protected final MultiFileWriter multiFileWriter; - protected final Store store; // latch that can be used to blockingly wait for RecoveryTarget to be closed private final CountDownLatch closedLatch = new CountDownLatch(1); @@ -94,10 +93,8 @@ public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, Replicati super("recovery_status", indexShard, indexShard.recoveryState().getIndex(), listener); this.sourceNode = sourceNode; indexShard.recoveryStats().incCurrentAsTarget(); - this.store = indexShard.store(); final String tempFilePrefix = getPrefix() + UUIDs.randomBase64UUID() + "."; this.multiFileWriter = new MultiFileWriter(indexShard.store(), stateIndex, tempFilePrefix, logger, this::ensureRefCount); - store.incRef(); } /** @@ -130,11 +127,6 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public Store store() { - ensureRefCount(); - return store; - } - public String description() { return "recovery from " + source(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index dff9c2dcec018..48fe134254d26 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -63,7 +63,7 @@ public static Stage fromId(byte id) { private final ReplicationTimer timer; public SegmentReplicationState(ReplicationLuceneIndex index) { - stage = Stage.DONE; + stage = Stage.INIT; this.index = index; timer = new ReplicationTimer(); timer.start(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 00cb8a34184fe..9fb27508d2e56 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -19,17 +19,15 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; -import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.util.CancellableThreads; import org.opensearch.index.shard.IndexShard; -import org.opensearch.index.shard.IndexShardState; import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.recovery.MultiFileWriter; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTarget; @@ -90,7 +88,6 @@ protected void onDone() { @Override public void onCancel(String reason) { - state.setStage(SegmentReplicationState.Stage.DONE); cancellableThreads.cancel(reason); } @@ -101,8 +98,8 @@ public SegmentReplicationState state() { @Override public ReplicationTarget retryCopy() { - // TODO return null; + // return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); } @Override @@ -142,7 +139,8 @@ public void writeFileChunk( * Start the Replication event. * @param listener {@link ActionListener} listener. */ - public void startReplication(ActionListener listener) { + public void startReplication(ActionListener listener) { + state.setStage(SegmentReplicationState.Stage.REPLICATING); final StepListener checkpointInfoListener = new StepListener<>(); final StepListener getFilesListener = new StepListener<>(); final StepListener finalizeListener = new StepListener<>(); @@ -155,7 +153,7 @@ public void startReplication(ActionListener listener) { response -> finalizeReplication(checkpointInfoListener.result(), finalizeListener), listener::onFailure ); - finalizeListener.whenComplete(r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); + finalizeListener.whenComplete(r -> listener.onResponse(null), listener::onFailure); } private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener getFilesListener) @@ -242,7 +240,7 @@ private ChecksumIndexInput toIndexInput(byte[] input) { } private Store.MetadataSnapshot getMetadataSnapshot() throws IOException { - if (indexShard.state().equals(IndexShardState.STARTED) == false) { + if (indexShard.getLatestSegmentInfos() == null) { return Store.MetadataSnapshot.EMPTY; } return store.getMetadata(indexShard.getLatestSegmentInfos()); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 32f67719b4300..1c6053a72a4c5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -13,7 +13,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.support.replication.ReplicationResponse; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.index.shard.IndexEventListener; @@ -94,7 +93,6 @@ public void startReplication( } public void startReplication(final SegmentReplicationTarget target) { - target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final long replicationId = onGoingReplications.start(target, recoverySettings.activityTimeout()); logger.trace(() -> new ParameterizedMessage("Starting replication {}", replicationId)); threadPool.generic().execute(new ReplicationRunner(replicationId)); @@ -143,7 +141,7 @@ private void start(final long replicationId) { try (ReplicationRef replicationRef = onGoingReplications.get(replicationId)) { replicationRef.get().startReplication(new ActionListener<>() { @Override - public void onResponse(ReplicationResponse o) { + public void onResponse(Void o) { onGoingReplications.markAsDone(replicationId); } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index aa17dec5767da..06b16b797efe3 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -23,7 +23,10 @@ import java.io.IOException; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; public class SegmentReplicationTargetServiceTests extends IndexShardTestCase { @@ -73,6 +76,8 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { + // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onResponse(null); return null; @@ -95,7 +100,7 @@ public void onReplicationDone(SegmentReplicationState state) { @Override public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertEquals(SegmentReplicationState.Stage.INIT, state.getStage()); + assertEquals(SegmentReplicationState.Stage.REPLICATING, state.getStage()); assertEquals(expectedError, e.getCause()); assertTrue(sendShardFailure); } @@ -103,6 +108,8 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept ); final SegmentReplicationTarget spy = Mockito.spy(target); doAnswer(invocation -> { + // setting stage to REPLICATING so transition in markAsDone succeeds on listener completion + target.state().setStage(SegmentReplicationState.Stage.REPLICATING); final ActionListener listener = invocation.getArgument(0); listener.onFailure(expectedError); return null; diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 1ab01e1254816..32f489e28a4da 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -8,5 +8,249 @@ package org.opensearch.indices.replication; -public class SegmentReplicationTargetTests { +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.apache.lucene.util.Version; +import org.junit.Assert; +import org.mockito.Mockito; +import org.opensearch.action.ActionListener; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.IndexShardTestCase; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class SegmentReplicationTargetTests extends IndexShardTestCase { + + SegmentReplicationTarget segrepTarget; + IndexShard indexShard, spyIndexShard; + ReplicationCheckpoint repCheckpoint; + ByteBuffersDataOutput buffer; + + private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); + + private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE.name(), SEGMENTS_FILE), + null, + 0 + ); + + SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); + + @Override + public void setUp() throws Exception { + + super.setUp(); + buffer = new ByteBuffersDataOutput(); + try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { + testSegmentInfos.write(indexOutput); + } + + repCheckpoint = mock(ReplicationCheckpoint.class); + Settings indexSettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build(); + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); + spyIndexShard = spy(indexShard); + Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), any(), anyLong()); + } + + public void testSuccessfulResponse_startReplication() { + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + try { + verify(spyIndexShard, times(1)).finalizeReplication(any(), any(), anyLong()); + } catch (IOException ex) { + Assert.fail(); + } + } + + @Override + public void onFailure(Exception e) { + Assert.fail(); + } + }); + } + + public void testFailureResponse_getCheckpointMetadata() { + Exception exception = new Exception("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onFailure(exception); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause().getCause()); + } + }); + } + + public void testFailureResponse_getSegmentFiles() { + Exception exception = new Exception("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onFailure(exception); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause().getCause()); + } + }); + } + + public void testFailure_finalizeReplication() throws IOException { + IOException exception = new IOException("dummy failure"); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), any(), anyLong()); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause()); + } + }); + } + + @Override + public void tearDown() throws Exception { + logger.info(indexShard.store()); + super.tearDown(); + segrepTarget.markAsDone(); + closeShards(spyIndexShard, indexShard); + } } From ed3fe28a3b968cec9ee54091dafa04445859be1f Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 9 Jun 2022 10:10:39 -0700 Subject: [PATCH 3/6] changes to tests + finalizeReplication Signed-off-by: Poojita Raj --- .../org/opensearch/index/engine/Engine.java | 2 - .../index/engine/NRTReplicationEngine.java | 54 +---------- .../opensearch/index/shard/IndexShard.java | 16 +--- .../org/opensearch/index/store/Store.java | 12 +-- .../replication/SegmentReplicationState.java | 10 +- .../replication/SegmentReplicationTarget.java | 43 ++++----- .../replication/common/ReplicationTarget.java | 4 + ...eplicationLocalCheckpointTrackerTests.java | 86 ----------------- .../SegmentReplicationTargetTests.java | 92 +++++++++++++++---- 9 files changed, 116 insertions(+), 203 deletions(-) delete mode 100644 server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index bcac0e5d13a7d..4829148322b31 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -271,8 +271,6 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException { } } - public void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) throws IOException {} - /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 7ec93fcfa094a..29d85271ba486 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -23,7 +23,6 @@ import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.seqno.SequenceNumbers; -import org.opensearch.index.store.Store; import org.opensearch.index.translog.DefaultTranslogDeletionPolicy; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; @@ -86,6 +85,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. + assert engineConfig.isReadOnlyReplica() : "Only replicas should update Infos"; readerManager.updateSegments(infos); // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher @@ -95,6 +95,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th rollTranslogGeneration(); } localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); + readerManager.maybeRefresh(); } @Override @@ -481,55 +482,4 @@ private TranslogDeletionPolicy getTranslogDeletionPolicy(EngineConfig engineConf ); } - public synchronized void finalizeReplication(SegmentInfos infos, Store.MetadataSnapshot expectedMetadata, long seqNo) - throws IOException { - assert engineConfig.isReadOnlyReplica() : "Only replicas should update Infos"; - - store.incRef(); - try { - refreshLastCommittedSegmentInfos(); - // clean up the local store of old segment files - // and validate the latest segment infos against the snapshot sent from the primary shard. - store.cleanupAndVerify( - "finalize - clean with in memory infos", - expectedMetadata, - store.getMetadata(infos), - store.getMetadata(lastCommittedSegmentInfos) - ); - } finally { - store.decRef(); - } - // Update the current infos reference on the Engine's reader. - readerManager.updateSegments(infos); - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); - readerManager.maybeRefresh(); - } - - private void refreshLastCommittedSegmentInfos() { - /* - * we have to inc-ref the store here since if the engine is closed by a tragic event - * we don't acquire the write lock and wait until we have exclusive access. This might also - * dec the store reference which can essentially close the store and unless we can inc the reference - * we can't use it. - */ - store.incRef(); - try { - // reread the last committed segment infos - lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); - } catch (Exception e) { - if (isClosed.get() == false) { - try { - logger.warn("failed to read latest segment infos on flush", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - if (Lucene.isCorruptionException(e)) { - throw new FlushFailedEngineException(shardId, e); - } - } - } finally { - store.decRef(); - } - } - } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 812ce8f518896..858eae2829285 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1364,19 +1364,9 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } - public void finalizeReplication(SegmentInfos infos, MetadataSnapshot expectedMetadata, long seqNo) throws IOException { - if (getEngine() instanceof NRTReplicationEngine) { - getEngine().finalizeReplication(infos, expectedMetadata, seqNo); - } - } - - /** - * Fetch a snapshot of the latest SegmentInfos held by the engine. - * @return {@link SegmentInfos} - */ - public SegmentInfos getLatestSegmentInfos() { - // TODO: implement - return null; + public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { + assert getEngine() instanceof NRTReplicationEngine; + ((NRTReplicationEngine) getEngine()).updateSegments(infos, seqNo); } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 4013fa1e6d013..86c9995c268e9 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -710,24 +710,20 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr /** * This method deletes every file in this store that is not contained in either the remote or local metadata snapshots. * This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file. - * In this case files from both snapshots must be preserved. + * In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk. * @param reason the reason for this cleanup operation logged for each deleted file - * @param remoteSnapshot The remote snapshot sent from primary shards. * @param localSnapshot The local snapshot from in memory SegmentInfos. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndVerify( + public void cleanupAndPreserveLatestCommitPoint( String reason, - MetadataSnapshot remoteSnapshot, - MetadataSnapshot localSnapshot, - MetadataSnapshot latestCommitPointMetadata + MetadataSnapshot localSnapshot ) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) { - cleanupFiles(reason, remoteSnapshot, latestCommitPointMetadata); - verifyAfterCleanup(remoteSnapshot, localSnapshot); + cleanupFiles(reason, localSnapshot, getMetadata(readLastCommittedSegmentsInfo())); } finally { metadataLock.writeLock().unlock(); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java index 48fe134254d26..838c06a4785ef 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java @@ -25,11 +25,11 @@ public class SegmentReplicationState implements ReplicationState { * @opensearch.internal */ public enum Stage { - INIT((byte) 1), + DONE((byte) 0), - REPLICATING((byte) 1), + INIT((byte) 1), - DONE((byte) 2); + REPLICATING((byte) 2); private static final Stage[] STAGES = new Stage[Stage.values().length]; @@ -85,9 +85,9 @@ public Stage getStage() { protected void validateAndSetStage(Stage expected, Stage next) { if (stage != expected) { - assert false : "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"; + assert false : "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"; throw new IllegalStateException( - "can't move recovery to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])" + "can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])" ); } stage = next; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 9fb27508d2e56..05fcbe880862b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -8,6 +8,7 @@ package org.opensearch.indices.replication; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; @@ -29,6 +30,7 @@ import org.opensearch.indices.recovery.MultiFileWriter; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTarget; @@ -57,7 +59,7 @@ public SegmentReplicationTarget( ReplicationCheckpoint checkpoint, IndexShard indexShard, SegmentReplicationSource source, - SegmentReplicationTargetService.SegmentReplicationListener listener + ReplicationListener listener ) { super("replication_target", indexShard, new ReplicationLuceneIndex(), listener); this.checkpoint = checkpoint; @@ -72,7 +74,7 @@ protected void closeInternal() { multiFileWriter.close(); } finally { // free store. increment happens in constructor - store.decRef(); + super.closeInternal(); } } @@ -86,20 +88,13 @@ protected void onDone() { state.setStage(SegmentReplicationState.Stage.DONE); } - @Override - public void onCancel(String reason) { - cancellableThreads.cancel(reason); - } - @Override public SegmentReplicationState state() { return state; } - @Override - public ReplicationTarget retryCopy() { - return null; - // return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); + public SegmentReplicationTarget retryCopy() { + return new SegmentReplicationTarget(checkpoint, indexShard, source, listener); } @Override @@ -161,7 +156,10 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) .collect(Collectors.toList()); @@ -184,21 +182,24 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { ActionListener.completeWith(listener, () -> { - // first, we go and move files that were created with the recovery id suffix to - // the actual names, its ok if we have a corrupted index here, since we have replicas - // to recover from in case of a full cluster shutdown just when this code executes... multiFileWriter.renameAllTempFiles(); final Store store = store(); store.incRef(); try { // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); + System.out.println(responseCheckpoint.getSegmentsGen()); SegmentInfos infos = SegmentInfos.readCommit( store.directory(), toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); - indexShard.finalizeReplication(infos, checkpointInfoResponse.getSnapshot(), responseCheckpoint.getSeqNo()); + indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); + store.cleanupAndPreserveLatestCommitPoint( + "finalize - clean with in memory infos", + store.getMetadata(infos) + ); + //method/function that checks if some segment doesn't match with that of primary we } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -215,11 +216,11 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, logger.debug("Failed to clean lucene index", e); ex.addSuppressed(e); } - ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after recovery", ex); + ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after replication", ex); fail(rfe, true); throw rfe; } catch (Exception ex) { - ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after recovery", ex); + ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after replication", ex); fail(rfe, true); throw rfe; } finally { @@ -239,10 +240,10 @@ private ChecksumIndexInput toIndexInput(byte[] input) { ); } - private Store.MetadataSnapshot getMetadataSnapshot() throws IOException { - if (indexShard.getLatestSegmentInfos() == null) { + Store.MetadataSnapshot getMetadataSnapshot() throws IOException { + if (indexShard.getSegmentInfosSnapshot() == null) { return Store.MetadataSnapshot.EMPTY; } - return store.getMetadata(indexShard.getLatestSegmentInfos()); + return store.getMetadata(indexShard.getSegmentInfosSnapshot().get()); } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 1aabe693c8937..27e23ceafb15e 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -277,4 +277,8 @@ public abstract void writeFileChunk( int totalTranslogOps, ActionListener listener ); + + protected void closeInternal() { + store.decRef(); + } } diff --git a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java b/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java deleted file mode 100644 index 0321ea23b1438..0000000000000 --- a/server/src/test/java/org/opensearch/index/seqno/SegmentReplicationLocalCheckpointTrackerTests.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.index.seqno; - -import org.junit.Before; -import org.opensearch.test.OpenSearchTestCase; - -import static org.hamcrest.Matchers.equalTo; - -public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase { - - private LocalCheckpointTracker tracker; - - public static LocalCheckpointTracker createEmptyTracker() { - return new LocalCheckpointTracker(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - tracker = createEmptyTracker(); - } - - public void testSimpleSegrepProcessedNoPersistentUpdate() { - // base case with no persistent checkpoint update - long seqNo1; - assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); - seqNo1 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(0L)); - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); - } - - public void testSimpleSegrepProcessedPersistentUpdate() { - // base case with persistent checkpoint update - long seqNo1; - assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); - seqNo1 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(0L)); - - tracker.markSeqNoAsPersisted(seqNo1); - assertThat(tracker.getPersistedCheckpoint(), equalTo(0L)); - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); - assertThat(tracker.hasProcessed(0L), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); - - // idempotent case - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(0L)); - assertThat(tracker.hasProcessed(0L), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(1)), equalTo(false)); - - } - - public void testSimpleSegrepProcessedPersistentUpdate2() { - long seqNo1, seqNo2; - assertThat(tracker.getProcessedCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED)); - seqNo1 = tracker.generateSeqNo(); - seqNo2 = tracker.generateSeqNo(); - assertThat(seqNo1, equalTo(0L)); - assertThat(seqNo2, equalTo(1L)); - tracker.markSeqNoAsPersisted(seqNo1); - tracker.markSeqNoAsPersisted(seqNo2); - assertThat(tracker.getProcessedCheckpoint(), equalTo(-1L)); - assertThat(tracker.getPersistedCheckpoint(), equalTo(1L)); - - tracker.fastForwardProcessedSeqNo(seqNo2); - assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); - assertThat(tracker.hasProcessed(seqNo1), equalTo(true)); - assertThat(tracker.hasProcessed(seqNo2), equalTo(true)); - - tracker.fastForwardProcessedSeqNo(seqNo1); - assertThat(tracker.getProcessedCheckpoint(), equalTo(1L)); - assertThat(tracker.hasProcessed(between(0, 1)), equalTo(true)); - assertThat(tracker.hasProcessed(atLeast(2)), equalTo(false)); - assertThat(tracker.getMaxSeqNo(), equalTo(1L)); - } -} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 32f489e28a4da..e2308173f4c8b 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -38,15 +38,17 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class SegmentReplicationTargetTests extends IndexShardTestCase { - SegmentReplicationTarget segrepTarget; - IndexShard indexShard, spyIndexShard; - ReplicationCheckpoint repCheckpoint; - ByteBuffersDataOutput buffer; + private SegmentReplicationTarget segrepTarget; + private IndexShard indexShard, spyIndexShard; + private ReplicationCheckpoint repCheckpoint; + private ByteBuffersDataOutput buffer; private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); + private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata(IndexFileNames.SEGMENTS, 5L, "different", Version.LATEST); private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( @@ -55,25 +57,34 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { 0 ); - SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); + private static final Store.MetadataSnapshot SI_SNAPSHOT_DIFFERENT = new Store.MetadataSnapshot( + Map.of(SEGMENTS_FILE_DIFF.name(), SEGMENTS_FILE_DIFF), + null, + 0 + ); + + SegmentInfos testSegmentInfos; @Override public void setUp() throws Exception { super.setUp(); - buffer = new ByteBuffersDataOutput(); - try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { - testSegmentInfos.write(indexOutput); - } - - repCheckpoint = mock(ReplicationCheckpoint.class); Settings indexSettings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, org.opensearch.Version.CURRENT) .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build(); + indexShard = newStartedShard(false, indexSettings, new NRTReplicationEngineFactory()); spyIndexShard = spy(indexShard); - Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), any(), anyLong()); + + Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), anyLong()); + testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo(); + buffer = new ByteBuffersDataOutput(); + try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { + testSegmentInfos.write(indexOutput); + } + repCheckpoint = new ReplicationCheckpoint(spyIndexShard.shardId(), spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), + spyIndexShard.seqNoStats().getLocalCheckpoint(), testSegmentInfos.version); } public void testSuccessfulResponse_startReplication() { @@ -96,9 +107,13 @@ public void getSegmentFiles( Store store, ActionListener listener ) { + assertEquals(filesToFetch.size(),2); + assert(filesToFetch.contains(SEGMENTS_FILE)); + assert(filesToFetch.contains(PENDING_DELETE_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( SegmentReplicationTargetService.SegmentReplicationListener.class ); @@ -108,7 +123,7 @@ public void getSegmentFiles( @Override public void onResponse(Void replicationResponse) { try { - verify(spyIndexShard, times(1)).finalizeReplication(any(), any(), anyLong()); + verify(spyIndexShard, times(1)).finalizeReplication(any(), anyLong()); } catch (IOException ex) { Assert.fail(); } @@ -116,12 +131,14 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { + logger.error("Unexpected test error", e); Assert.fail(); } }); } public void testFailureResponse_getCheckpointMetadata() { + Exception exception = new Exception("dummy failure"); SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @Override @@ -163,6 +180,7 @@ public void onFailure(Exception e) { } public void testFailureResponse_getSegmentFiles() { + Exception exception = new Exception("dummy failure"); SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @Override @@ -203,7 +221,8 @@ public void onFailure(Exception e) { }); } - public void testFailure_finalizeReplication() throws IOException { + public void testFailure_finalizeReplicationThrows() throws IOException { + IOException exception = new IOException("dummy failure"); SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @Override @@ -231,7 +250,7 @@ public void getSegmentFiles( ); segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); - doThrow(exception).when(spyIndexShard).finalizeReplication(any(), any(), anyLong()); + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); segrepTarget.startReplication(new ActionListener() { @Override @@ -246,9 +265,50 @@ public void onFailure(Exception e) { }); } + public void testFailure_differentSegmentFiles() throws IOException { + + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = spy(new SegmentReplicationTarget(repCheckpoint, indexShard, segrepSource, segRepListener)); + when(segrepTarget.getMetadataSnapshot()).thenReturn(SI_SNAPSHOT_DIFFERENT); + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + logger.error(e); + assert(e instanceof IllegalStateException); + } + }); + } + @Override public void tearDown() throws Exception { - logger.info(indexShard.store()); super.tearDown(); segrepTarget.markAsDone(); closeShards(spyIndexShard, indexShard); From f3669186c25de533eb3c4db740560431a1b68e8e Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 9 Jun 2022 10:29:16 -0700 Subject: [PATCH 4/6] fix style check Signed-off-by: Poojita Raj --- .../org/opensearch/index/store/Store.java | 5 +--- .../replication/SegmentReplicationTarget.java | 27 ++++++++++++------- .../SegmentReplicationTargetTests.java | 26 ++++++++++++------ 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 86c9995c268e9..083581a801c9e 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -715,10 +715,7 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr * @param localSnapshot The local snapshot from in memory SegmentInfos. * @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup. */ - public void cleanupAndPreserveLatestCommitPoint( - String reason, - MetadataSnapshot localSnapshot - ) throws IOException { + public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot localSnapshot) throws IOException { // fetch a snapshot from the latest on disk Segments_N file. This can be behind // the passed in local in memory snapshot, so we want to ensure files it references are not removed. metadataLock.writeLock().lock(); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 05fcbe880862b..89c4b574300cf 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -158,7 +158,12 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) .collect(Collectors.toList()); @@ -188,18 +193,14 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, try { // Deserialize the new SegmentInfos object sent from the primary. final ReplicationCheckpoint responseCheckpoint = checkpointInfoResponse.getCheckpoint(); - System.out.println(responseCheckpoint.getSegmentsGen()); SegmentInfos infos = SegmentInfos.readCommit( store.directory(), toIndexInput(checkpointInfoResponse.getInfosBytes()), responseCheckpoint.getSegmentsGen() ); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); - store.cleanupAndPreserveLatestCommitPoint( - "finalize - clean with in memory infos", - store.getMetadata(infos) - ); - //method/function that checks if some segment doesn't match with that of primary we + store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); + // method/function that checks if some segment doesn't match with that of primary we } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are @@ -216,11 +217,19 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, logger.debug("Failed to clean lucene index", e); ex.addSuppressed(e); } - ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after replication", ex); + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); fail(rfe, true); throw rfe; } catch (Exception ex) { - ReplicationFailedException rfe = new ReplicationFailedException(indexShard.shardId(), "failed to clean after replication", ex); + ReplicationFailedException rfe = new ReplicationFailedException( + indexShard.shardId(), + "failed to clean after replication", + ex + ); fail(rfe, true); throw rfe; } finally { diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index e2308173f4c8b..13ee9133bdba1 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -48,7 +48,12 @@ public class SegmentReplicationTargetTests extends IndexShardTestCase { private ByteBuffersDataOutput buffer; private static final StoreFileMetadata SEGMENTS_FILE = new StoreFileMetadata(IndexFileNames.SEGMENTS, 1L, "0", Version.LATEST); - private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata(IndexFileNames.SEGMENTS, 5L, "different", Version.LATEST); + private static final StoreFileMetadata SEGMENTS_FILE_DIFF = new StoreFileMetadata( + IndexFileNames.SEGMENTS, + 5L, + "different", + Version.LATEST + ); private static final StoreFileMetadata PENDING_DELETE_FILE = new StoreFileMetadata("pendingDelete.del", 1L, "1", Version.LATEST); private static final Store.MetadataSnapshot SI_SNAPSHOT = new Store.MetadataSnapshot( @@ -78,13 +83,18 @@ public void setUp() throws Exception { spyIndexShard = spy(indexShard); Mockito.doNothing().when(spyIndexShard).finalizeReplication(any(SegmentInfos.class), anyLong()); - testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo(); + testSegmentInfos = spyIndexShard.store().readLastCommittedSegmentsInfo(); buffer = new ByteBuffersDataOutput(); try (ByteBuffersIndexOutput indexOutput = new ByteBuffersIndexOutput(buffer, "", null)) { testSegmentInfos.write(indexOutput); } - repCheckpoint = new ReplicationCheckpoint(spyIndexShard.shardId(), spyIndexShard.getPendingPrimaryTerm(), testSegmentInfos.getGeneration(), - spyIndexShard.seqNoStats().getLocalCheckpoint(), testSegmentInfos.version); + repCheckpoint = new ReplicationCheckpoint( + spyIndexShard.shardId(), + spyIndexShard.getPendingPrimaryTerm(), + testSegmentInfos.getGeneration(), + spyIndexShard.seqNoStats().getLocalCheckpoint(), + testSegmentInfos.version + ); } public void testSuccessfulResponse_startReplication() { @@ -107,9 +117,9 @@ public void getSegmentFiles( Store store, ActionListener listener ) { - assertEquals(filesToFetch.size(),2); - assert(filesToFetch.contains(SEGMENTS_FILE)); - assert(filesToFetch.contains(PENDING_DELETE_FILE)); + assertEquals(filesToFetch.size(), 2); + assert (filesToFetch.contains(SEGMENTS_FILE)); + assert (filesToFetch.contains(PENDING_DELETE_FILE)); listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); } }; @@ -302,7 +312,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { logger.error(e); - assert(e instanceof IllegalStateException); + assert (e instanceof IllegalStateException); } }); } From ebb2b5ea0f55a26822d3c4f2c62e309f85d7fe28 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 9 Jun 2022 13:32:00 -0700 Subject: [PATCH 5/6] addressing comments + fix gradle check Signed-off-by: Poojita Raj --- .../java/org/opensearch/OpenSearchException.java | 6 ++++++ .../index/engine/NRTReplicationEngine.java | 2 -- .../java/org/opensearch/index/shard/IndexShard.java | 13 +++++++++++-- .../replication/SegmentReplicationTarget.java | 4 ++++ .../common/ReplicationFailedException.java | 5 +++++ .../org/opensearch/ExceptionSerializationTests.java | 2 ++ 6 files changed, 28 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index a6a12d7ebb4f7..070e90667127a 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -1594,6 +1594,12 @@ private enum OpenSearchExceptionHandle { org.opensearch.transport.NoSeedNodeLeftException::new, 160, LegacyESVersion.V_7_10_0 + ), + REPLICATION_FAILED_EXCEPTION( + org.opensearch.indices.replication.common.ReplicationFailedException.class, + org.opensearch.indices.replication.common.ReplicationFailedException::new, + 161, + UNKNOWN_VERSION_ADDED ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 29d85271ba486..e4f4bbbba8f16 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -85,7 +85,6 @@ public NRTReplicationEngine(EngineConfig engineConfig) { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. - assert engineConfig.isReadOnlyReplica() : "Only replicas should update Infos"; readerManager.updateSegments(infos); // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher @@ -95,7 +94,6 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th rollTranslogGeneration(); } localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); - readerManager.maybeRefresh(); } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 858eae2829285..6fa1005462da2 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1364,9 +1364,18 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } + public Optional getReplicationEngine() { + if (getEngine() instanceof NRTReplicationEngine) { + return Optional.of((NRTReplicationEngine) getEngine()); + } else { + return Optional.empty(); + } + } + public void finalizeReplication(SegmentInfos infos, long seqNo) throws IOException { - assert getEngine() instanceof NRTReplicationEngine; - ((NRTReplicationEngine) getEngine()).updateSegments(infos, seqNo); + if (getReplicationEngine().isPresent()) { + getReplicationEngine().get().updateSegments(infos, seqNo); + } } /** diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 89c4b574300cf..9e1f62c051536 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -157,6 +157,10 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { From 531dedd4481cb4b8d0e9a92e696587938a86760d Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Tue, 14 Jun 2022 15:27:56 -0700 Subject: [PATCH 6/6] added test + addressed review comments Signed-off-by: Poojita Raj --- .../org/opensearch/OpenSearchException.java | 3 +- .../opensearch/index/shard/IndexShard.java | 4 +- .../org/opensearch/index/store/Store.java | 5 +- .../replication/SegmentReplicationTarget.java | 10 ++-- .../common/ReplicationFailedException.java | 26 +--------- .../opensearch/index/store/StoreTests.java | 51 ++++++++++++++++--- .../SegmentReplicationTargetTests.java | 48 ++++++++++++++++- 7 files changed, 102 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index 070e90667127a..2de25aa1cd6dc 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -67,6 +67,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static java.util.Collections.unmodifiableMap; +import static org.opensearch.Version.V_2_1_0; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_UUID_NA_VALUE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.common.xcontent.XContentParserUtils.ensureFieldName; @@ -1599,7 +1600,7 @@ private enum OpenSearchExceptionHandle { org.opensearch.indices.replication.common.ReplicationFailedException.class, org.opensearch.indices.replication.common.ReplicationFailedException::new, 161, - UNKNOWN_VERSION_ADDED + V_2_1_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 6fa1005462da2..d083d947dfbc3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -162,8 +162,8 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.recovery.RecoveryTarget; import org.opensearch.indices.replication.checkpoint.PublishCheckpointRequest; -import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.rest.RestStatus; @@ -1364,7 +1364,7 @@ public GatedCloseable acquireLastIndexCommit(boolean flushFirst) th } } - public Optional getReplicationEngine() { + private Optional getReplicationEngine() { if (getEngine() instanceof NRTReplicationEngine) { return Optional.of((NRTReplicationEngine) getEngine()); } else { diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index 083581a801c9e..2309004c0777d 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -726,12 +726,12 @@ public void cleanupAndPreserveLatestCommitPoint(String reason, MetadataSnapshot } } - private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nullable MetadataSnapshot additionalSnapshot) + private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable MetadataSnapshot additionalSnapshot) throws IOException { assert metadataLock.isWriteLockedByCurrentThread(); for (String existingFile : directory.listAll()) { if (Store.isAutogenerated(existingFile) - || remoteSnapshot.contains(existingFile) + || localSnapshot.contains(existingFile) || (additionalSnapshot != null && additionalSnapshot.contains(existingFile))) { // don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete // checksum) @@ -739,7 +739,6 @@ private void cleanupFiles(String reason, MetadataSnapshot remoteSnapshot, @Nulla } try { directory.deleteFile(reason, existingFile); - // FNF should not happen since we hold a write lock? } catch (IOException ex) { if (existingFile.startsWith(IndexFileNames.SEGMENTS) || existingFile.startsWith(CORRUPTED_MARKER_NAME_PREFIX)) { // TODO do we need to also fail this if we can't delete the pending commit file? diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 9e1f62c051536..fb68e59f3b2ef 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -36,12 +36,12 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; /** * Represents the target of a replication event. @@ -73,7 +73,6 @@ protected void closeInternal() { try { multiFileWriter.close(); } finally { - // free store. increment happens in constructor super.closeInternal(); } } @@ -169,8 +168,7 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener filesToFetch = Stream.concat(diff.missing.stream(), diff.different.stream()) - .collect(Collectors.toList()); + final List filesToFetch = new ArrayList(diff.missing); Set storeFiles = new HashSet<>(Arrays.asList(store.directory().listAll())); final Set pendingDeleteFiles = checkpointInfo.getPendingDeleteFiles() @@ -185,8 +183,9 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { @@ -204,7 +203,6 @@ private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ); indexShard.finalizeReplication(infos, responseCheckpoint.getSeqNo()); store.cleanupAndPreserveLatestCommitPoint("finalize - clean with in memory infos", store.getMetadata(infos)); - // method/function that checks if some segment doesn't match with that of primary we } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { // this is a fatal exception at this stage. // this means we transferred files from the remote that have not be checksummed and they are diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java index d2b263c3dee7a..afdd0ce466f9b 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java @@ -6,30 +6,6 @@ * compatible open source license. */ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - package org.opensearch.indices.replication.common; import org.opensearch.OpenSearchException; @@ -56,7 +32,7 @@ public ReplicationFailedException(IndexShard shard, @Nullable String extraInfo, } public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, Throwable cause) { - super(shardId + ": Replication failed " + ("on ") + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause); + super(shardId + ": Replication failed on " + (extraInfo == null ? "" : " (" + extraInfo + ")"), cause); } public ReplicationFailedException(StreamInput in) throws IOException { diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index d99bde4764adf..b6bced9f038c0 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -31,7 +31,6 @@ package org.opensearch.index.store; -import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -51,7 +50,6 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.store.ByteBuffersDirectory; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; @@ -60,9 +58,12 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.NIOFSDirectory; -import org.apache.lucene.util.BytesRef; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.Version; +import org.hamcrest.Matchers; import org.opensearch.ExceptionsHelper; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.UUIDs; @@ -81,9 +82,8 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.test.DummyShardLock; -import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; -import org.hamcrest.Matchers; +import org.opensearch.test.OpenSearchTestCase; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -102,7 +102,6 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.unmodifiableMap; -import static org.opensearch.test.VersionUtils.randomVersion; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -114,6 +113,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.opensearch.test.VersionUtils.randomVersion; public class StoreTests extends OpenSearchTestCase { @@ -1149,4 +1149,43 @@ public void testGetMetadataWithSegmentInfos() throws IOException { assertEquals(segmentInfos.getSegmentsFileName(), metadataSnapshot.getSegmentsFile().name()); store.close(); } + + public void testcleanupAndPreserveLatestCommitPoint() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId)); + IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec( + TestUtil.getDefaultCodec() + ); + indexWriterConfig.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); + IndexWriter writer = new IndexWriter(store.directory(), indexWriterConfig); + int docs = 1 + random().nextInt(100); + writer.commit(); + Document doc = new Document(); + doc.add(new TextField("id", "" + docs++, random().nextBoolean() ? Field.Store.YES : Field.Store.NO)); + doc.add( + new TextField( + "body", + TestUtil.randomRealisticUnicodeString(random()), + random().nextBoolean() ? Field.Store.YES : Field.Store.NO + ) + ); + doc.add(new SortedDocValuesField("dv", new BytesRef(TestUtil.randomRealisticUnicodeString(random())))); + writer.addDocument(doc); + writer.commit(); + writer.close(); + + Store.MetadataSnapshot commitMetadata = store.getMetadata(); + + Store.MetadataSnapshot refreshMetadata = Store.MetadataSnapshot.EMPTY; + + store.cleanupAndPreserveLatestCommitPoint("test", refreshMetadata); + + // we want to ensure commitMetadata files are preserved after calling cleanup + for (String existingFile : store.directory().listAll()) { + assert (commitMetadata.contains(existingFile) == true); + } + + deleteContent(store.directory()); + IOUtils.close(store); + } } diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 13ee9133bdba1..a0944ee249859 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -9,6 +9,7 @@ package org.opensearch.indices.replication; import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; @@ -231,7 +232,7 @@ public void onFailure(Exception e) { }); } - public void testFailure_finalizeReplicationThrows() throws IOException { + public void testFailure_finalizeReplication_IOException() throws IOException { IOException exception = new IOException("dummy failure"); SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @@ -275,6 +276,50 @@ public void onFailure(Exception e) { }); } + public void testFailure_finalizeReplication_IndexFormatException() throws IOException { + + IndexFormatTooNewException exception = new IndexFormatTooNewException("string", 1, 2, 1); + SegmentReplicationSource segrepSource = new SegmentReplicationSource() { + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + listener.onResponse(new CheckpointInfoResponse(checkpoint, SI_SNAPSHOT, buffer.toArrayCopy(), Set.of(PENDING_DELETE_FILE))); + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + Store store, + ActionListener listener + ) { + listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); + } + }; + SegmentReplicationTargetService.SegmentReplicationListener segRepListener = mock( + SegmentReplicationTargetService.SegmentReplicationListener.class + ); + segrepTarget = new SegmentReplicationTarget(repCheckpoint, spyIndexShard, segrepSource, segRepListener); + + doThrow(exception).when(spyIndexShard).finalizeReplication(any(), anyLong()); + + segrepTarget.startReplication(new ActionListener() { + @Override + public void onResponse(Void replicationResponse) { + Assert.fail(); + } + + @Override + public void onFailure(Exception e) { + assertEquals(exception, e.getCause()); + } + }); + } + public void testFailure_differentSegmentFiles() throws IOException { SegmentReplicationSource segrepSource = new SegmentReplicationSource() { @@ -311,7 +356,6 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { - logger.error(e); assert (e instanceof IllegalStateException); } });