From 2a3683e4007de25fd1304912bc7405b07552991a Mon Sep 17 00:00:00 2001 From: Ankit Kala Date: Wed, 7 Jun 2023 22:26:04 +0530 Subject: [PATCH] SegRep with Remote: Add Remote store as a segment replication source (#7653) * SegRep with Remote: Add Remote store as a segment replication source Signed-off-by: Ankit Kala * Fix Gradle check Signed-off-by: Ankit Kala * Retrying Gradle check Signed-off-by: Ankit Kala --------- Signed-off-by: Ankit Kala --- CHANGELOG.md | 3 +- .../replication/SegmentReplicationIT.java | 16 ++ .../SegmentReplicationRemoteStoreIT.java | 2 +- .../SegmentReplicationUsingRemoteStoreIT.java | 66 ++++++++ .../index/engine/InternalEngine.java | 10 +- .../opensearch/index/shard/IndexShard.java | 65 ++++---- .../shard/RemoteStoreRefreshListener.java | 7 +- .../opensearch/index/shard/StoreRecovery.java | 2 +- .../index/store/RemoteDirectory.java | 11 +- .../store/RemoteSegmentStoreDirectory.java | 7 +- .../org/opensearch/index/store/Store.java | 3 +- .../recovery/PeerRecoveryTargetService.java | 2 +- .../PrimaryShardReplicationSource.java | 4 +- .../RemoteStoreReplicationSource.java | 99 ++++++++++++ .../replication/SegmentReplicationSource.java | 6 +- .../SegmentReplicationSourceFactory.java | 18 ++- .../replication/SegmentReplicationTarget.java | 10 +- .../index/engine/InternalEngineTests.java | 2 + .../RemoteStoreRefreshListenerTests.java | 4 +- .../SegmentReplicationIndexShardTests.java | 8 +- ...tReplicationWithRemoteIndexShardTests.java | 43 +++++ .../PrimaryShardReplicationSourceTests.java | 7 +- .../RemoteStoreReplicationSourceTests.java | 150 ++++++++++++++++++ .../SegmentReplicationTargetServiceTests.java | 3 +- .../SegmentReplicationTargetTests.java | 14 +- .../replication/TestReplicationSource.java | 4 +- .../index/shard/IndexShardTestCase.java | 2 +- 27 files changed, 486 insertions(+), 82 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java create mode 100644 server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java create mode 100644 server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java create mode 100644 server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 08994a6cd4bad..74b382190fb11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 2.x] ### Added - Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452)) +- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653)) ### Dependencies - Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897)) @@ -106,7 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Removed ### Fixed -- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924 +- Fixing error: adding a new/forgotten parameter to the configuration for checking the config on startup in plugins/repository-s3 #7924 ### Security diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index c4420922d1dcb..08d1878c2df27 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -47,6 +47,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.lease.Releasable; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.index.IndexModule; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationPressureService; @@ -580,6 +581,7 @@ public void testDeleteOperations() throws Exception { * from xlog. */ public void testReplicationPostDeleteAndForceMerge() throws Exception { + assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); final String replica = internalCluster().startNode(); @@ -785,6 +787,10 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception { } public void testPressureServiceStats() throws Exception { + assumeFalse( + "Skipping the test as pressure service is not compatible with SegRep and Remote store yet.", + segmentReplicationWithRemoteEnabled() + ); final String primaryNode = internalCluster().startNode(); createIndex(INDEX_NAME); final String replicaNode = internalCluster().startNode(); @@ -874,6 +880,7 @@ public void testPressureServiceStats() throws Exception { * @throws Exception when issue is encountered */ public void testScrollCreatedOnReplica() throws Exception { + assumeFalse("Skipping the test with Remote store as its flaky.", segmentReplicationWithRemoteEnabled()); // create the cluster with one primary node containing primary shard and replica node containing replica shard final String primary = internalCluster().startNode(); createIndex(INDEX_NAME); @@ -963,6 +970,11 @@ public void testScrollCreatedOnReplica() throws Exception { * @throws Exception when issue is encountered */ public void testScrollWithOngoingSegmentReplication() throws Exception { + assumeFalse( + "Skipping the test as its not compatible with segment replication with remote store yet.", + segmentReplicationWithRemoteEnabled() + ); + // create the cluster with one primary node containing primary shard and replica node containing replica shard final String primary = internalCluster().startNode(); prepareCreate( @@ -1249,4 +1261,8 @@ public void testPrimaryReceivesDocsDuringReplicaRecovery() throws Exception { waitForSearchableDocs(2, nodes); } + private boolean segmentReplicationWithRemoteEnabled() { + return IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexSettings()).booleanValue() + && "true".equalsIgnoreCase(featureFlagSettings().get(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL)); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java index 055f997fbe197..ab0c0cc3aec77 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java @@ -30,7 +30,7 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT { - private static final String REPOSITORY_NAME = "test-remore-store-repo"; + private static final String REPOSITORY_NAME = "test-remote-store-repo"; @Override public Settings indexSettings() { diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java new file mode 100644 index 0000000000000..ad6e358cb9da1 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationUsingRemoteStoreIT.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationIT; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +/** + * This class runs Segment Replication Integ test suite with remote store enabled. + * Setup is similar to SegmentReplicationRemoteStoreIT but this also enables the segment replication using remote store which + * is behind SEGMENT_REPLICATION_EXPERIMENTAL flag. After this is moved out of experimental, we can combine and keep only one + * test suite for Segment and Remote store integration tests. + */ +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationUsingRemoteStoreIT extends SegmentReplicationIT { + + private static final String REPOSITORY_NAME = "test-remote-store-repo"; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.REMOTE_STORE, "true") + .put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true") + .build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e777a9ef724a9..63afc6585a99d 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2760,6 +2760,13 @@ public final long lastRefreshedCheckpoint() { return lastRefreshedCheckpointListener.refreshedCheckpoint.get(); } + /** + * Returns the current local checkpoint getting refreshed internally. + */ + public final long currentOngoingRefreshCheckpoint() { + return lastRefreshedCheckpointListener.pendingCheckpoint; + } + private final Object refreshIfNeededMutex = new Object(); /** @@ -2777,10 +2784,11 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) { private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener { final AtomicLong refreshedCheckpoint; - private long pendingCheckpoint; + volatile long pendingCheckpoint; LastRefreshedCheckpointListener(long initialLocalCheckpoint) { this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint); + this.pendingCheckpoint = initialLocalCheckpoint; } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d6ab3a764abf2..ecc46e720fdaf 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2231,7 +2231,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t assert currentEngineReference.get() == null : "engine is running"; verifyNotClosed(); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false); + syncSegmentsFromRemoteSegmentStore(false, true, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4404,7 +4404,7 @@ public void close() throws IOException { }; IOUtils.close(currentEngineReference.getAndSet(readOnlyEngine)); if (indexSettings.isRemoteStoreEnabled()) { - syncSegmentsFromRemoteSegmentStore(false); + syncSegmentsFromRemoteSegmentStore(false, true, true); } if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { syncRemoteTranslogAndUpdateGlobalCheckpoint(); @@ -4454,23 +4454,15 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog()); } - /** - * Downloads segments from remote segment store. This method will download segments till - * last refresh checkpoint. - * @param overrideLocal flag to override local segment files with those in remote store - * @throws IOException if exception occurs while reading segments from remote store - */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOException { - syncSegmentsFromRemoteSegmentStore(overrideLocal, true); - } - /** * Downloads segments from remote segment store. * @param overrideLocal flag to override local segment files with those in remote store * @param refreshLevelSegmentSync last refresh checkpoint is used if true, commit checkpoint otherwise + * @param shouldCommit if the shard requires committing the changes after sync from remote. * @throws IOException if exception occurs while reading segments from remote store */ - public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync) throws IOException { + public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean refreshLevelSegmentSync, boolean shouldCommit) + throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; @@ -4529,29 +4521,34 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re indexInput, remoteSegmentMetadata.getGeneration() ); - long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); - // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs - // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, - // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the - // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest - // commit. - Optional localMaxSegmentInfos = localSegmentFiles.stream() - .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) - .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); - if (localMaxSegmentInfos.isPresent() - && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) - 1) { - // If remote translog is not enabled, local translog will be created with different UUID. - // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs - // to be same. Following code block make sure to have the same UUID. - if (indexSettings.isRemoteTranslogStoreEnabled() == false) { - SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); - Map userData = new HashMap<>(infosSnapshot.getUserData()); - userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); - infosSnapshot.setUserData(userData, false); + if (shouldCommit) { + long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY)); + // Following code block makes sure to use SegmentInfosSnapshot in the remote store if generation differs + // with local filesystem. If local filesystem already has segments_N+2 and infosSnapshot has generation N, + // after commit, there would be 2 files that would be created segments_N+1 and segments_N+2. With the + // policy of preserving only the latest commit, we will delete segments_N+1 which in fact is the part of the latest + // commit. + Optional localMaxSegmentInfos = localSegmentFiles.stream() + .filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) + .max(Comparator.comparingLong(SegmentInfos::generationFromSegmentsFileName)); + if (localMaxSegmentInfos.isPresent() + && infosSnapshot.getGeneration() < SegmentInfos.generationFromSegmentsFileName(localMaxSegmentInfos.get()) + - 1) { + // If remote translog is not enabled, local translog will be created with different UUID. + // This fails in Store.trimUnsafeCommits() as translog UUID of checkpoint and SegmentInfos needs + // to be same. Following code block make sure to have the same UUID. + if (indexSettings.isRemoteTranslogStoreEnabled() == false) { + SegmentInfos localSegmentInfos = store.readLastCommittedSegmentsInfo(); + Map userData = new HashMap<>(infosSnapshot.getUserData()); + userData.put(TRANSLOG_UUID_KEY, localSegmentInfos.userData.get(TRANSLOG_UUID_KEY)); + infosSnapshot.setUserData(userData, false); + } + storeDirectory.deleteFile(localMaxSegmentInfos.get()); } - storeDirectory.deleteFile(localMaxSegmentInfos.get()); + store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); + } else { + finalizeReplication(infosSnapshot); } - store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint); } } } catch (IOException e) { diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 8f71a9f3ff658..b24a78971b71f 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -334,12 +334,11 @@ private boolean isRefreshAfterCommit() throws IOException { } void uploadMetadata(Collection localSegmentsPostRefresh, SegmentInfos segmentInfos) throws IOException { - final long maxSeqNoFromSegmentInfos = indexShard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos); - + final long maxSeqNo = ((InternalEngine) indexShard.getEngine()).currentOngoingRefreshCheckpoint(); SegmentInfos segmentInfosSnapshot = segmentInfos.clone(); Map userData = segmentInfosSnapshot.getUserData(); - userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNoFromSegmentInfos)); - userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNoFromSegmentInfos)); + userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(maxSeqNo)); + userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); segmentInfosSnapshot.setUserData(userData, false); remoteDirectory.uploadMetadata( diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 9393a5ac38ac2..02397bc356539 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -453,7 +453,7 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco remoteStore.incRef(); try { // Download segments from remote segment store - indexShard.syncSegmentsFromRemoteSegmentStore(true); + indexShard.syncSegmentsFromRemoteSegmentStore(true, true, true); if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index bb2ff47cf0072..5192fd49b91f6 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -18,6 +18,7 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Collections; @@ -100,7 +101,15 @@ public IndexOutput createOutput(String name, IOContext context) { */ @Override public IndexInput openInput(String name, IOContext context) throws IOException { - return new RemoteIndexInput(name, blobContainer.readBlob(name), fileLength(name)); + InputStream inputStream = null; + try { + inputStream = blobContainer.readBlob(name); + return new RemoteIndexInput(name, inputStream, fileLength(name)); + } catch (Exception e) { + // Incase the RemoteIndexInput creation fails, close the input stream to avoid file handler leak. + if (inputStream != null) inputStream.close(); + throw e; + } } /** diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 6549bfc6d91f4..93fa4b7eff7b7 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -137,8 +137,7 @@ public RemoteSegmentMetadata init() throws IOException { * @return Map of segment filename to uploaded filename with checksum * @throws IOException if there were any failures in reading the metadata file */ - private RemoteSegmentMetadata readLatestMetadataFile() throws IOException { - Map segmentMetadataMap = new HashMap<>(); + public RemoteSegmentMetadata readLatestMetadataFile() throws IOException { RemoteSegmentMetadata remoteSegmentMetadata = null; Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); @@ -199,6 +198,10 @@ public static UploadedSegmentMetadata fromString(String uploadedFilename) { String[] values = uploadedFilename.split(SEPARATOR); return new UploadedSegmentMetadata(values[0], values[1], values[2], Long.parseLong(values[3])); } + + public String getOriginalFilename() { + return originalFilename; + } } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index dae698b1c3b46..2c0d5decebba8 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -399,7 +399,8 @@ public static RecoveryDiff segmentReplicationDiff(Map missing.add(value); } else { final StoreFileMetadata fileMetadata = target.get(value.name()); - if (fileMetadata.isSame(value)) { + // match segments using checksum + if (fileMetadata.checksum().equals(value.checksum())) { identical.add(value); } else { different.add(value); diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 5fb84a165c498..a08d3182fa156 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -245,7 +245,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi indexShard.prepareForIndexRecovery(); final boolean hasRemoteSegmentStore = indexShard.indexSettings().isRemoteStoreEnabled(); if (hasRemoteSegmentStore) { - indexShard.syncSegmentsFromRemoteSegmentStore(false, false); + indexShard.syncSegmentsFromRemoteSegmentStore(false, false, true); } final boolean hasRemoteTranslog = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled(); final boolean hasNoTranslog = indexShard.indexSettings().isRemoteSnapshot(); diff --git a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java index b211d81c1c76a..5455be2a69799 100644 --- a/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java @@ -13,7 +13,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.io.stream.Writeable; -import org.opensearch.index.store.Store; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.recovery.RetryableTransportClient; @@ -79,7 +79,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { final Writeable.Reader reader = GetSegmentFilesResponse::new; diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java new file mode 100644 index 0000000000000..b8941737a7c57 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.util.Version; +import org.opensearch.action.ActionListener; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.Store; +import org.opensearch.index.store.StoreFileMetadata; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Implementation of a {@link SegmentReplicationSource} where the source is remote store. + * + * @opensearch.internal + */ +public class RemoteStoreReplicationSource implements SegmentReplicationSource { + + private static final Logger logger = LogManager.getLogger(PrimaryShardReplicationSource.class); + + private final IndexShard indexShard; + + public RemoteStoreReplicationSource(IndexShard indexShard) { + this.indexShard = indexShard; + } + + @Override + public void getCheckpointMetadata( + long replicationId, + ReplicationCheckpoint checkpoint, + ActionListener listener + ) { + FilterDirectory remoteStoreDirectory = (FilterDirectory) indexShard.remoteStore().directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + RemoteSegmentStoreDirectory remoteDirectory = (RemoteSegmentStoreDirectory) byteSizeCachingStoreDirectory.getDelegate(); + + Map metadataMap; + // TODO: Need to figure out a way to pass this information for segment metadata via remote store. + final Version version = indexShard.getSegmentInfosSnapshot().get().getCommitLuceneVersion(); + try { + metadataMap = remoteDirectory.readLatestMetadataFile() + .getMetadata() + .entrySet() + .stream() + .collect( + Collectors.toMap( + e -> e.getKey(), + e -> new StoreFileMetadata( + e.getValue().getOriginalFilename(), + e.getValue().getLength(), + Store.digestToString(Long.valueOf(e.getValue().getChecksum())), + version, + null + ) + ) + ); + // TODO: GET current checkpoint from remote store. + listener.onResponse(new CheckpointInfoResponse(checkpoint, metadataMap, null)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public void getSegmentFiles( + long replicationId, + ReplicationCheckpoint checkpoint, + List filesToFetch, + IndexShard indexShard, + ActionListener listener + ) { + try { + indexShard.syncSegmentsFromRemoteSegmentStore(false, true, false); + listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); + } catch (Exception e) { + listener.onFailure(e); + } + } + + @Override + public String getDescription() { + return "remote store"; + } +} diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java index 2fa74819fe4de..79b9b31e3d5c3 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java @@ -10,7 +10,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException; -import org.opensearch.index.store.Store; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -38,14 +38,14 @@ public interface SegmentReplicationSource { * @param replicationId {@link long} - ID of the replication event. * @param checkpoint {@link ReplicationCheckpoint} Checkpoint to fetch metadata for. * @param filesToFetch {@link List} List of files to fetch. - * @param store {@link Store} Reference to the local store. + * @param indexShard {@link IndexShard} Reference to the IndexShard. * @param listener {@link ActionListener} Listener that completes with the list of files copied. */ void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java index 1867fc59c5a56..238e316c3b585 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceFactory.java @@ -38,13 +38,17 @@ public SegmentReplicationSourceFactory( } public SegmentReplicationSource get(IndexShard shard) { - return new PrimaryShardReplicationSource( - shard.recoveryState().getTargetNode(), - shard.routingEntry().allocationId().getId(), - transportService, - recoverySettings, - getPrimaryNode(shard.shardId()) - ); + if (shard.indexSettings().isSegRepWithRemoteEnabled()) { + return new RemoteStoreReplicationSource(shard); + } else { + return new PrimaryShardReplicationSource( + shard.recoveryState().getTargetNode(), + shard.routingEntry().allocationId().getId(), + transportService, + recoverySettings, + getPrimaryNode(shard.shardId()) + ); + } } private DiscoveryNode getPrimaryNode(ShardId shardId) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index ae722b8742e94..22c68ad46fea6 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -204,10 +204,18 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener listener) { + // TODO: Refactor the logic so that finalize doesn't have to be invoked for remote store as source + if (source instanceof RemoteStoreReplicationSource) { + ActionListener.completeWith(listener, () -> { + state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); + return null; + }); + return; + } ActionListener.completeWith(listener, () -> { cancellableThreads.checkForCancel(); state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION); diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index 91e45bd28f710..3bd8e09cab777 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -6637,6 +6637,7 @@ public void testLastRefreshCheckpoint() throws Exception { while (done.get() == false) { long checkPointBeforeRefresh = engine.getProcessedLocalCheckpoint(); engine.refresh("test", randomFrom(Engine.SearcherScope.values()), true); + assertThat(engine.currentOngoingRefreshCheckpoint(), greaterThanOrEqualTo(engine.lastRefreshedCheckpoint())); assertThat(engine.lastRefreshedCheckpoint(), greaterThanOrEqualTo(checkPointBeforeRefresh)); } }); @@ -6650,6 +6651,7 @@ public void testLastRefreshCheckpoint() throws Exception { thread.join(); } engine.refresh("test"); + assertThat(engine.currentOngoingRefreshCheckpoint(), greaterThanOrEqualTo(engine.lastRefreshedCheckpoint())); assertThat(engine.lastRefreshedCheckpoint(), equalTo(engine.getProcessedLocalCheckpoint())); } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 8408463a820b6..8dea8d272b4e8 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -421,14 +421,14 @@ private Tuple m return Tuple.tuple(refreshListener, remoteRefreshSegmentPressureService); } - private static class TestFilterDirectory extends FilterDirectory { + public static class TestFilterDirectory extends FilterDirectory { /** * Sole constructor, typically called from sub-classes. * * @param in input directory */ - protected TestFilterDirectory(Directory in) { + public TestFilterDirectory(Directory in) { super(in); } } diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 1d0bc4cd1910b..0ee3e81678511 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -841,7 +841,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(Collections.emptyList())); @@ -911,7 +911,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { Assert.fail("Should not be reached"); @@ -951,7 +951,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { // randomly resolve the listener, indicating the source has resolved. @@ -993,7 +993,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) {} }; diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java new file mode 100644 index 0000000000000..cae15a2c53f3f --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationWithRemoteIndexShardTests.java @@ -0,0 +1,43 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.NRTReplicationEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.indices.replication.common.ReplicationType; + +import java.io.IOException; + +public class SegmentReplicationWithRemoteIndexShardTests extends OpenSearchIndexLevelReplicationTestCase { + private static final Settings settings = Settings.builder() + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "temp-fs") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false) + .build(); + + public void testReplicaSyncingFromRemoteStore() throws IOException { + ReplicationGroup shards = createGroup(1, settings, indexMapping, new NRTReplicationEngineFactory(), createTempDir()); + final IndexShard primaryShard = shards.getPrimary(); + final IndexShard replicaShard = shards.getReplicas().get(0); + shards.startPrimary(); + shards.startAll(); + indexDoc(primaryShard, "_doc", "1"); + indexDoc(primaryShard, "_doc", "2"); + primaryShard.refresh("test"); + assertDocs(primaryShard, "1", "2"); + flushShard(primaryShard); + + replicaShard.syncSegmentsFromRemoteSegmentStore(true, true, false); + assertDocs(replicaShard, "1", "2"); + closeShards(primaryShard, replicaShard); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java index fdd707ae88195..4d273c71e7861 100644 --- a/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/PrimaryShardReplicationSourceTests.java @@ -21,7 +21,6 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; @@ -123,7 +122,7 @@ public void testGetSegmentFiles() { REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -151,7 +150,7 @@ public void testTransportTimeoutForGetSegmentFilesAction() { REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), mock(ActionListener.class) ); CapturingTransport.CapturedRequest[] requestList = transport.getCapturedRequestsAndClear(); @@ -176,7 +175,7 @@ public void testGetSegmentFiles_CancelWhileRequestOpen() throws InterruptedExcep REPLICATION_ID, checkpoint, Arrays.asList(testMetadata), - mock(Store.class), + mock(IndexShard.class), new ActionListener<>() { @Override public void onResponse(GetSegmentFilesResponse getSegmentFilesResponse) { diff --git a/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java new file mode 100644 index 0000000000000..04f821a5fc48c --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/replication/RemoteStoreReplicationSourceTests.java @@ -0,0 +1,150 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.replication; + +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.store.FilterDirectory; +import org.mockito.Mockito; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.engine.InternalEngineFactory; +import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.shard.RemoteStoreRefreshListenerTests; +import org.opensearch.index.store.RemoteSegmentStoreDirectory; +import org.opensearch.index.store.Store; +import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; + +import java.io.IOException; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class RemoteStoreReplicationSourceTests extends OpenSearchIndexLevelReplicationTestCase { + + private static final long PRIMARY_TERM = 1L; + private static final long SEGMENTS_GEN = 2L; + private static final long VERSION = 4L; + private static final long REPLICATION_ID = 123L; + private RemoteStoreReplicationSource replicationSource; + private IndexShard indexShard; + + private IndexShard mockShard; + + @Override + public void setUp() throws Exception { + super.setUp(); + indexShard = newStartedShard( + true, + Settings.builder().put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true).build(), + new InternalEngineFactory() + ); + + indexDoc(indexShard, "_doc", "1"); + indexDoc(indexShard, "_doc", "2"); + indexShard.refresh("test"); + + // mock shard + mockShard = mock(IndexShard.class); + Store store = mock(Store.class); + when(mockShard.store()).thenReturn(store); + when(store.directory()).thenReturn(indexShard.store().directory()); + Store remoteStore = mock(Store.class); + when(mockShard.remoteStore()).thenReturn(remoteStore); + RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = + (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()).getDelegate()) + .getDelegate(); + FilterDirectory remoteStoreFilterDirectory = new RemoteStoreRefreshListenerTests.TestFilterDirectory( + new RemoteStoreRefreshListenerTests.TestFilterDirectory(remoteSegmentStoreDirectory) + ); + when(remoteStore.directory()).thenReturn(remoteStoreFilterDirectory); + + replicationSource = new RemoteStoreReplicationSource(mockShard); + } + + @Override + public void tearDown() throws Exception { + closeShards(indexShard); + super.tearDown(); + } + + public void testGetCheckpointMetadata() throws ExecutionException, InterruptedException { + when(mockShard.getSegmentInfosSnapshot()).thenReturn(indexShard.getSegmentInfosSnapshot()); + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); + + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); + CheckpointInfoResponse response = res.get(); + assert (response.getCheckpoint().equals(checkpoint)); + assert (!response.getMetadataMap().isEmpty()); + } + + public void testGetCheckpointMetadataFailure() { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); + + when(mockShard.getSegmentInfosSnapshot()).thenThrow(new RuntimeException("test")); + + assertThrows(RuntimeException.class, () -> { + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource.getCheckpointMetadata(REPLICATION_ID, checkpoint, res); + res.get(); + }); + } + + public void testGetSegmentFiles() throws ExecutionException, InterruptedException { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); + + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), indexShard, res); + GetSegmentFilesResponse response = res.get(); + assert (response.files.isEmpty()); + assertEquals("remote store", replicationSource.getDescription()); + + } + + public void testGetSegmentFilesFailure() throws IOException { + final ReplicationCheckpoint checkpoint = new ReplicationCheckpoint( + indexShard.shardId(), + PRIMARY_TERM, + SEGMENTS_GEN, + VERSION, + Codec.getDefault().getName() + ); + Mockito.doThrow(new RuntimeException("testing")) + .when(mockShard) + .syncSegmentsFromRemoteSegmentStore(Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean()); + assertThrows(ExecutionException.class, () -> { + final PlainActionFuture res = PlainActionFuture.newFuture(); + replicationSource.getSegmentFiles(REPLICATION_ID, checkpoint, Collections.emptyList(), mockShard, res); + res.get(10, TimeUnit.SECONDS); + }); + } +} diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 1d1777758972c..9c796ec05c22a 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -22,7 +22,6 @@ import org.opensearch.index.replication.TestReplicationSource; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; -import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.ForceSyncRequest; @@ -181,7 +180,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { Assert.fail("Should not be called"); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 0e711af1afa62..ac8904527f7fb 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -128,7 +128,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { assertEquals(1, filesToFetch.size()); @@ -179,7 +179,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -222,7 +222,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onFailure(exception); @@ -265,7 +265,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -310,7 +310,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -354,7 +354,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); @@ -405,7 +405,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { listener.onResponse(new GetSegmentFilesResponse(filesToFetch)); diff --git a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java index a3adedcbdef86..cf4b3800069bf 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/TestReplicationSource.java @@ -9,7 +9,7 @@ package org.opensearch.index.replication; import org.opensearch.action.ActionListener; -import org.opensearch.index.store.Store; +import org.opensearch.index.shard.IndexShard; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.CheckpointInfoResponse; import org.opensearch.indices.replication.GetSegmentFilesResponse; @@ -35,7 +35,7 @@ public abstract void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ); diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 9681c01688337..5a01fcaa1ddaf 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -1339,7 +1339,7 @@ public void getSegmentFiles( long replicationId, ReplicationCheckpoint checkpoint, List filesToFetch, - Store store, + IndexShard indexShard, ActionListener listener ) { try (