From 61ef98089052c8bb659906952c442e5095d9552f Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 21 Feb 2023 14:44:51 +0530 Subject: [PATCH 1/9] Add support to run SegRep integ tests using remote store settings Signed-off-by: Sachin Kale --- .../SegmentReplicationRemoteStoreIT.java | 57 +++++++++++++++++++ .../index/engine/ReadOnlyEngine.java | 6 +- .../index/seqno/ReplicationTracker.java | 6 +- .../opensearch/index/shard/IndexShard.java | 45 +++++++++++---- .../opensearch/index/shard/StoreRecovery.java | 27 ++------- ...emoteBlobStoreInternalTranslogFactory.java | 4 ++ .../index/translog/RemoteFsTranslog.java | 21 +++++-- .../index/shard/IndexShardTests.java | 2 +- .../index/translog/RemoteFSTranslogTests.java | 21 +++++-- 9 files changed, 144 insertions(+), 45 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java new file mode 100644 index 0000000000000..ba4be7f545237 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/SegmentReplicationRemoteStoreIT.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.SegmentReplicationIT; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.nio.file.Path; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SegmentReplicationRemoteStoreIT extends SegmentReplicationIT { + + private static final String REPOSITORY_NAME = "test-remore-store-repo"; + + @Override + public Settings indexSettings() { + return Settings.builder() + .put(super.indexSettings()) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, REPOSITORY_NAME) + .build(); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.REMOTE_STORE, "true").build(); + } + + @Before + public void setup() { + internalCluster().startClusterManagerOnlyNode(); + Path absolutePath = randomRepoPath().toAbsolutePath(); + assertAcked( + clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) + ); + } + + @After + public void teardown() { + assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); + } +} diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index fbd3349240743..c5fcd5a4bc976 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -137,7 +137,9 @@ public ReadOnlyEngine( } if (seqNoStats == null) { seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos); - ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); + if (config.getIndexSettings().isRemoteTranslogStoreEnabled() == false) { + ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); + } } this.seqNoStats = seqNoStats; this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); @@ -186,7 +188,7 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStat // In addition to that we only execute the check if the index the engine belongs to has been // created after the refactoring of the Close Index API and its TransportVerifyShardBeforeCloseAction // that guarantee that all operations have been flushed to Lucene. - assert assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint()); + assertMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats.getMaxSeqNo(), seqNoStats.getGlobalCheckpoint()); if (seqNoStats.getMaxSeqNo() != seqNoStats.getGlobalCheckpoint()) { throw new IllegalStateException( "Maximum sequence number [" diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index d39b31f923eae..4adca3eaa9b79 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1118,8 +1118,12 @@ public long getAsLong() { * @param reason the reason the global checkpoint was updated */ public synchronized void updateGlobalCheckpointOnReplica(final long newGlobalCheckpoint, final String reason) { - assert invariant(); assert primaryMode == false; + updateGlobalCheckpoint(newGlobalCheckpoint, reason); + } + + public synchronized void updateGlobalCheckpoint(final long newGlobalCheckpoint, final String reason) { + assert invariant(); /* * The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary * information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4a2d0f0198051..57dde2c1fb871 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -158,11 +158,15 @@ import org.opensearch.index.store.Store.MetadataSnapshot; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreStats; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; +import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.index.translog.TranslogStats; +import org.opensearch.index.translog.transfer.FileTransferTracker; +import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.warmer.ShardIndexWarmerService; import org.opensearch.index.warmer.WarmerStats; import org.opensearch.indices.IndexingMemoryController; @@ -178,6 +182,7 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.rest.RestStatus; import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.threadpool.ThreadPool; @@ -2181,6 +2186,10 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t if (indexSettings.isRemoteStoreEnabled()) { syncSegmentsFromRemoteSegmentStore(false); } + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + syncTranslogFilesFromRemoteTranslog(); + loadGlobalCheckpointToReplicationTracker(); + } // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata). final Engine newEngine = engineFactory.newReadWriteEngine(config); onNewEngine(newEngine); @@ -2462,10 +2471,10 @@ public void recoverFromStore(ActionListener listener) { storeRecovery.recoverFromStore(this, listener); } - public void restoreFromRemoteStore(Repository repository, ActionListener listener) { + public void restoreFromRemoteStore(ActionListener listener) { assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard"; StoreRecovery storeRecovery = new StoreRecovery(shardId, logger); - storeRecovery.recoverFromRemoteStore(this, repository, listener); + storeRecovery.recoverFromRemoteStore(this, listener); } public void restoreFromRepository(Repository repository, ActionListener listener) { @@ -3079,7 +3088,8 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move * to recovery finalization, or even finished recovery before the update arrives here. */ - assert state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED + assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) + || (indexSettings.isRemoteTranslogStoreEnabled() == true && state() != IndexShardState.RECOVERING) : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " @@ -3264,14 +3274,7 @@ public void startRecovery( executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore); break; case REMOTE_STORE: - final Repository remoteTranslogRepo; - final String remoteTranslogRepoName = indexSettings.getRemoteStoreTranslogRepository(); - if (remoteTranslogRepoName != null) { - remoteTranslogRepo = repositoriesService.repository(remoteTranslogRepoName); - } else { - remoteTranslogRepo = null; - } - executeRecovery("from remote store", recoveryState, recoveryListener, l -> restoreFromRemoteStore(remoteTranslogRepo, l)); + executeRecovery("from remote store", recoveryState, recoveryListener, this::restoreFromRemoteStore); break; case PEER: try { @@ -4338,6 +4341,10 @@ public void close() throws IOException { if (indexSettings.isRemoteStoreEnabled()) { syncSegmentsFromRemoteSegmentStore(false); } + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + syncTranslogFilesFromRemoteTranslog(); + loadGlobalCheckpointToReplicationTracker(); + } newEngineReference.set(engineFactory.newReadWriteEngine(newEngineConfig(replicationTracker))); onNewEngine(newEngineReference.get()); } @@ -4371,6 +4378,22 @@ public void close() throws IOException { onSettingsChanged(); } + public void syncTranslogFilesFromRemoteTranslog() throws IOException { + TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); + assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; + Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); + TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( + blobStoreRepository, + getThreadPool(), + shardId, + fileTransferTracker + ); + RemoteFsTranslog.download(translogTransferManager, shardPath().resolveTranslog()); + } + /** * Downloads segments from remote segment store. This method will download segments till * last refresh checkpoint. diff --git a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java index 972a76bc17eb5..c0925703288b4 100644 --- a/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/opensearch/index/shard/StoreRecovery.java @@ -60,15 +60,11 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.snapshots.IndexShardRestoreFailedException; import org.opensearch.index.store.Store; -import org.opensearch.index.translog.RemoteFsTranslog; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.FileTransferTracker; -import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.repositories.IndexId; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import java.io.IOException; import java.util.Arrays; @@ -118,13 +114,13 @@ void recoverFromStore(final IndexShard indexShard, ActionListener liste } } - void recoverFromRemoteStore(final IndexShard indexShard, Repository repository, ActionListener listener) { + void recoverFromRemoteStore(final IndexShard indexShard, ActionListener listener) { if (canRecover(indexShard)) { RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType(); assert recoveryType == RecoverySource.Type.REMOTE_STORE : "expected remote store recovery type but was: " + recoveryType; ActionListener.completeWith(recoveryListener(indexShard, listener), () -> { logger.debug("starting recovery from remote store ..."); - recoverFromRemoteStore(indexShard, repository); + recoverFromRemoteStore(indexShard); return true; }); } else { @@ -439,7 +435,7 @@ private ActionListener recoveryListener(IndexShard indexShard, ActionLi }); } - private void recoverFromRemoteStore(IndexShard indexShard, Repository repository) throws IndexShardRecoveryException { + private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardRecoveryException { final Store remoteStore = indexShard.remoteStore(); if (remoteStore == null) { throw new IndexShardRecoveryException( @@ -460,8 +456,8 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository if (store.directory().listAll().length == 0) { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); } - if (repository != null) { - syncTranslogFilesFromRemoteTranslog(indexShard, repository); + if (indexShard.indexSettings.isRemoteTranslogStoreEnabled()) { + indexShard.syncTranslogFilesFromRemoteTranslog(); } else { bootstrap(indexShard, store); } @@ -480,19 +476,6 @@ private void recoverFromRemoteStore(IndexShard indexShard, Repository repository } } - private void syncTranslogFilesFromRemoteTranslog(IndexShard indexShard, Repository repository) throws IOException { - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); - TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( - blobStoreRepository, - indexShard.getThreadPool(), - shardId, - fileTransferTracker - ); - RemoteFsTranslog.download(translogTransferManager, indexShard.shardPath().resolveTranslog()); - } - /** * Recovers the state of the shard from the store. */ diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java index e439a56581c14..339e16db6f360 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteBlobStoreInternalTranslogFactory.java @@ -71,4 +71,8 @@ public Translog newTranslog( primaryModeSupplier ); } + + public Repository getRepository() { + return repository; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 5180469d83e1c..27a6308edbaf8 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -14,6 +14,7 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; +import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; @@ -85,6 +86,9 @@ public RemoteFsTranslog( try { download(translogTransferManager, location); Checkpoint checkpoint = readCheckpoint(location); + assert globalCheckpointSupplier instanceof ReplicationTracker + : "globalCheckpointSupplier is not instance of ReplicationTracker"; + ((ReplicationTracker) globalCheckpointSupplier).updateGlobalCheckpoint(checkpoint.globalCheckpoint, "RemoteFsTranslog init"); this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); @@ -123,15 +127,24 @@ public static void download(TranslogTransferManager translogTransferManager, Pat if (Files.notExists(location)) { Files.createDirectories(location); } - // Delete translog files on local before downloading from remote - for (Path file : FileSystemUtils.files(location)) { - Files.delete(file); - } Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { String generation = Long.toString(i); + boolean ckpExists = Files.exists(location.resolve(Translog.getCommitCheckpointFileName(i))); + boolean tlogExists = Files.exists(location.resolve(Translog.getFilename(i))); + if(ckpExists || tlogExists) { + if (ckpExists) { + Files.delete(location.resolve(Translog.getCommitCheckpointFileName(i))); + } + if (tlogExists) { + Files.delete(location.resolve(Translog.getFilename(i))); + } + } translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); } + if (Files.exists(location.resolve(Translog.CHECKPOINT_FILE_NAME))) { + Files.delete(location.resolve(Translog.CHECKPOINT_FILE_NAME)); + } // We copy the latest generation .ckp file to translog.ckp so that flows that depend on // existence of translog.ckp file work in the same way Files.copy( diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 9fc360e182439..8e3acf1346235 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -2864,7 +2864,7 @@ public void testRestoreShardFromRemoteStore(boolean performFlush) throws IOExcep DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); target.markAsRecovering("remote_store", new RecoveryState(routing, localNode, null)); final PlainActionFuture future = PlainActionFuture.newFuture(); - target.restoreFromRemoteStore(null, future); + target.restoreFromRemoteStore(future); target.remoteStore().decRef(); assertTrue(future.actionGet()); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index d5acf3efde6e6..84b03e95a7c07 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -41,8 +41,10 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.MissingHistoryOperationsException; +import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.LocalCheckpointTrackerTests; +import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -93,6 +95,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; +import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -103,7 +106,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); protected RemoteFsTranslog translog; - private AtomicLong globalCheckpoint; + private ReplicationTracker replicationTracker; protected Path translogDir; // A default primary term is used by translog instances created in this test. private final AtomicLong primaryTerm = new AtomicLong(); @@ -155,7 +158,17 @@ private RemoteFsTranslog create(Path path) throws IOException { private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { this.repository = repository; - globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + replicationTracker = new ReplicationTracker( + shardId, + translogUUID, + IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), + primaryTerm.get(), + UNASSIGNED_SEQ_NO, + value -> {}, + System::currentTimeMillis, + (leases, listener) -> {}, + () -> SafeCommitInfo.EMPTY + ); final TranslogConfig translogConfig = getTranslogConfig(path); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); @@ -164,7 +177,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin translogConfig, translogUUID, deletionPolicy, - () -> globalCheckpoint.get(), + replicationTracker, primaryTerm::get, getPersistedSeqNoConsumer(), repository, @@ -1259,7 +1272,7 @@ public int write(ByteBuffer src) throws IOException { config, translogUUID, new DefaultTranslogDeletionPolicy(-1, -1, 0), - () -> SequenceNumbers.NO_OPS_PERFORMED, + replicationTracker, primaryTerm::get, persistedSeqNos::add, repository, From 7c2cfc5941e279c0789e9d55c350bd0747083767 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 21 Feb 2023 16:25:35 +0530 Subject: [PATCH 2/9] Apply Spotless fixes Signed-off-by: Sachin Kale --- .../opensearch/index/translog/RemoteFsTranslog.java | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 27a6308edbaf8..f8dafce0b2d31 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -9,7 +9,6 @@ package org.opensearch.index.translog; import org.opensearch.common.SetOnce; -import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; @@ -132,13 +131,11 @@ public static void download(TranslogTransferManager translogTransferManager, Pat String generation = Long.toString(i); boolean ckpExists = Files.exists(location.resolve(Translog.getCommitCheckpointFileName(i))); boolean tlogExists = Files.exists(location.resolve(Translog.getFilename(i))); - if(ckpExists || tlogExists) { - if (ckpExists) { - Files.delete(location.resolve(Translog.getCommitCheckpointFileName(i))); - } - if (tlogExists) { - Files.delete(location.resolve(Translog.getFilename(i))); - } + if (ckpExists) { + Files.delete(location.resolve(Translog.getCommitCheckpointFileName(i))); + } + if (tlogExists) { + Files.delete(location.resolve(Translog.getFilename(i))); } translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); } From 39b6573fad1b7f56a53ff0bdc499f1e5413aed63 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Tue, 21 Feb 2023 17:59:03 +0530 Subject: [PATCH 3/9] Remove redundant code Signed-off-by: Sachin Kale --- .../index/seqno/ReplicationTracker.java | 6 +----- .../index/translog/RemoteFsTranslog.java | 3 --- .../index/translog/RemoteFSTranslogTests.java | 21 ++++--------------- 3 files changed, 5 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java index 4adca3eaa9b79..d39b31f923eae 100644 --- a/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/opensearch/index/seqno/ReplicationTracker.java @@ -1118,12 +1118,8 @@ public long getAsLong() { * @param reason the reason the global checkpoint was updated */ public synchronized void updateGlobalCheckpointOnReplica(final long newGlobalCheckpoint, final String reason) { - assert primaryMode == false; - updateGlobalCheckpoint(newGlobalCheckpoint, reason); - } - - public synchronized void updateGlobalCheckpoint(final long newGlobalCheckpoint, final String reason) { assert invariant(); + assert primaryMode == false; /* * The global checkpoint here is a local knowledge which is updated under the mandate of the primary. It can happen that the primary * information is lagging compared to a replica (e.g., if a replica is promoted to primary but has stale info relative to other diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f8dafce0b2d31..e2d04a7178b8a 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -85,9 +85,6 @@ public RemoteFsTranslog( try { download(translogTransferManager, location); Checkpoint checkpoint = readCheckpoint(location); - assert globalCheckpointSupplier instanceof ReplicationTracker - : "globalCheckpointSupplier is not instance of ReplicationTracker"; - ((ReplicationTracker) globalCheckpointSupplier).updateGlobalCheckpoint(checkpoint.globalCheckpoint, "RemoteFsTranslog init"); this.readers.addAll(recoverFromFiles(checkpoint)); if (readers.isEmpty()) { throw new IllegalStateException("at least one reader must be recovered"); diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java index 84b03e95a7c07..d5acf3efde6e6 100644 --- a/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/RemoteFSTranslogTests.java @@ -41,10 +41,8 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.index.IndexSettings; import org.opensearch.index.engine.MissingHistoryOperationsException; -import org.opensearch.index.engine.SafeCommitInfo; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.LocalCheckpointTrackerTests; -import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; @@ -95,7 +93,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.opensearch.common.util.BigArrays.NON_RECYCLING_INSTANCE; -import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.opensearch.index.translog.SnapshotMatchers.containsOperationsInAnyOrder; import static org.opensearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy; @@ -106,7 +103,7 @@ public class RemoteFSTranslogTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); protected RemoteFsTranslog translog; - private ReplicationTracker replicationTracker; + private AtomicLong globalCheckpoint; protected Path translogDir; // A default primary term is used by translog instances created in this test. private final AtomicLong primaryTerm = new AtomicLong(); @@ -158,17 +155,7 @@ private RemoteFsTranslog create(Path path) throws IOException { private RemoteFsTranslog create(Path path, BlobStoreRepository repository, String translogUUID) throws IOException { this.repository = repository; - replicationTracker = new ReplicationTracker( - shardId, - translogUUID, - IndexSettingsModule.newIndexSettings("index", Settings.EMPTY), - primaryTerm.get(), - UNASSIGNED_SEQ_NO, - value -> {}, - System::currentTimeMillis, - (leases, listener) -> {}, - () -> SafeCommitInfo.EMPTY - ); + globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); final TranslogConfig translogConfig = getTranslogConfig(path); final TranslogDeletionPolicy deletionPolicy = createTranslogDeletionPolicy(translogConfig.getIndexSettings()); threadPool = new TestThreadPool(getClass().getName()); @@ -177,7 +164,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin translogConfig, translogUUID, deletionPolicy, - replicationTracker, + () -> globalCheckpoint.get(), primaryTerm::get, getPersistedSeqNoConsumer(), repository, @@ -1272,7 +1259,7 @@ public int write(ByteBuffer src) throws IOException { config, translogUUID, new DefaultTranslogDeletionPolicy(-1, -1, 0), - replicationTracker, + () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTerm::get, persistedSeqNos::add, repository, From ed97a69b9d484552ac6f5036759a654f41562311 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Wed, 1 Mar 2023 16:21:07 +0530 Subject: [PATCH 4/9] Fix spotless Signed-off-by: Sachin Kale --- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index e2d04a7178b8a..e631b2b48d357 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -13,7 +13,6 @@ import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; import org.opensearch.core.internal.io.IOUtils; -import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.index.translog.transfer.FileTransferTracker; From 035db36fd8db8cc193b56431e39f0fd447cad8ff Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 6 Mar 2023 15:00:34 +0530 Subject: [PATCH 5/9] Fix test Signed-off-by: Sachin Kale --- .../index/translog/RemoteFsTranslog.java | 38 ++++++++++++++----- .../transfer/TranslogTransferManager.java | 17 ++++++--- .../TranslogTransferManagerTests.java | 28 ++++++++++++-- 3 files changed, 65 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index e631b2b48d357..a1987abf6db39 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -9,6 +9,7 @@ package org.opensearch.index.translog; import org.opensearch.common.SetOnce; +import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.lease.Releasable; import org.opensearch.common.lease.Releasables; import org.opensearch.common.util.concurrent.ReleasableLock; @@ -27,13 +28,16 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * A Translog implementation which syncs local FS with a remote store @@ -116,24 +120,17 @@ public RemoteFsTranslog( } public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { - TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { if (Files.notExists(location)) { Files.createDirectories(location); + } else { + deleteTranslogFilesNotUploaded(location, translogMetadata.getGeneration()); } Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { String generation = Long.toString(i); - boolean ckpExists = Files.exists(location.resolve(Translog.getCommitCheckpointFileName(i))); - boolean tlogExists = Files.exists(location.resolve(Translog.getFilename(i))); - if (ckpExists) { - Files.delete(location.resolve(Translog.getCommitCheckpointFileName(i))); - } - if (tlogExists) { - Files.delete(location.resolve(Translog.getFilename(i))); - } - translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); + translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location, false); } if (Files.exists(location.resolve(Translog.CHECKPOINT_FILE_NAME))) { Files.delete(location.resolve(Translog.CHECKPOINT_FILE_NAME)); @@ -147,6 +144,27 @@ public static void download(TranslogTransferManager translogTransferManager, Pat } } + private static void deleteTranslogFilesNotUploaded(Path location, long uploadedGeneration) throws IOException { + // Delete translog files with generation > translogMetadata.getGeneration() + List generationsNotUploaded = Arrays.stream(FileSystemUtils.files(location)) + .map(filePath -> filePath.getFileName().toString()) + .filter(filename -> filename.endsWith(TRANSLOG_FILE_SUFFIX)) + .map(filename -> { + long generation = Long.parseLong( + filename.substring(TRANSLOG_FILE_PREFIX.length(), filename.length() - TRANSLOG_FILE_SUFFIX.length()) + ); + return generation > uploadedGeneration ? generation : -1; + }) + .filter(generation -> generation > -1) + .collect(Collectors.toList()); + for (Long generation : generationsNotUploaded) { + Path checkpointFileName = location.resolve(Translog.getCommitCheckpointFileName(generation)); + Path translogFileName = location.resolve(Translog.getFilename(generation)); + Files.deleteIfExists(checkpointFileName); + Files.deleteIfExists(translogFileName); + } + } + public static TranslogTransferManager buildTranslogTransferManager( BlobStoreRepository blobStoreRepository, ThreadPool threadPool, diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 95536317bcc1b..a6b8a1a9e3175 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -133,19 +133,26 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans } } - public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException { + public boolean downloadTranslog(String primaryTerm, String generation, Path location, boolean override) throws IOException { logger.info( "Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}", primaryTerm, generation, location ); - // Download Checkpoint file from remote to local FS String ckpFileName = Translog.getCommitCheckpointFileName(Long.parseLong(generation)); - downloadToFS(ckpFileName, location, primaryTerm); - // Download translog file from remote to local FS String translogFilename = Translog.getFilename(Long.parseLong(generation)); - downloadToFS(translogFilename, location, primaryTerm); + if (override == false && Files.exists(location.resolve(ckpFileName)) && Files.exists(location.resolve(translogFilename))) { + fileTransferTracker.add(ckpFileName, true); + fileTransferTracker.add(translogFilename, true); + } else { + // Download Checkpoint file from remote to local FS + Files.deleteIfExists(location.resolve(ckpFileName)); + downloadToFS(ckpFileName, location, primaryTerm); + // Download translog file from remote to local FS + Files.deleteIfExists(location.resolve(translogFilename)); + downloadToFS(translogFilename, location, primaryTerm); + } return true; } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index c5c5315dee09e..4723b4d21e878 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -43,6 +43,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; @LuceneTestCase.SuppressFileSystems("*") public class TranslogTransferManagerTests extends OpenSearchTestCase { @@ -246,7 +247,7 @@ public void testDownloadTranslog() throws IOException { assertFalse(Files.exists(location.resolve("translog-23.tlog"))); assertFalse(Files.exists(location.resolve("translog-23.ckp"))); - translogTransferManager.downloadTranslog("12", "23", location); + translogTransferManager.downloadTranslog("12", "23", location, false); assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } @@ -266,7 +267,7 @@ public void testDownloadTranslogAlreadyExists() throws IOException { new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) ); - translogTransferManager.downloadTranslog("12", "23", location); + translogTransferManager.downloadTranslog("12", "23", location, true); verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); verify(transferService).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); @@ -274,6 +275,27 @@ public void testDownloadTranslogAlreadyExists() throws IOException { assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } + public void testDownloadTranslogAlreadyExistsOverrideFalse() throws IOException { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + Path location = createTempDir(); + Files.createFile(location.resolve("translog-23.tlog")); + Files.createFile(location.resolve("translog-23.ckp")); + + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); + + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( + new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) + ); + when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.ckp"))).thenReturn( + new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) + ); + + translogTransferManager.downloadTranslog("12", "23", location, false); + + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.tlog")); + verify(transferService, times(0)).downloadBlob(any(BlobPath.class), eq("translog-23.ckp")); + } + public void testDownloadTranslogWithTrackerUpdated() throws IOException { FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); Path location = createTempDir(); @@ -290,7 +312,7 @@ public void testDownloadTranslogWithTrackerUpdated() throws IOException { new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) ); - translogTransferManager.downloadTranslog("12", "23", location); + translogTransferManager.downloadTranslog("12", "23", location, true); verify(transferService).downloadBlob(any(BlobPath.class), eq(translogFile)); verify(transferService).downloadBlob(any(BlobPath.class), eq(checkpointFile)); From 079137fe35e7a161193a046564bbb2d0008b927e Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 20 Mar 2023 13:02:38 +0530 Subject: [PATCH 6/9] Address PR comments Signed-off-by: Sachin Kale --- .../index/translog/RemoteFsTranslog.java | 30 ++++++++----------- .../transfer/TranslogTransferManager.java | 6 +++- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index a1987abf6db39..4c0cfa763f64a 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -30,14 +30,12 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Semaphore; import java.util.function.BooleanSupplier; import java.util.function.LongConsumer; import java.util.function.LongSupplier; -import java.util.stream.Collectors; /** * A Translog implementation which syncs local FS with a remote store @@ -130,7 +128,7 @@ public static void download(TranslogTransferManager translogTransferManager, Pat Map generationToPrimaryTermMapper = translogMetadata.getGenerationToPrimaryTermMapper(); for (long i = translogMetadata.getGeneration(); i >= translogMetadata.getMinTranslogGeneration(); i--) { String generation = Long.toString(i); - translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location, false); + translogTransferManager.downloadTranslog(generationToPrimaryTermMapper.get(generation), generation, location); } if (Files.exists(location.resolve(Translog.CHECKPOINT_FILE_NAME))) { Files.delete(location.resolve(Translog.CHECKPOINT_FILE_NAME)); @@ -146,23 +144,21 @@ public static void download(TranslogTransferManager translogTransferManager, Pat private static void deleteTranslogFilesNotUploaded(Path location, long uploadedGeneration) throws IOException { // Delete translog files with generation > translogMetadata.getGeneration() - List generationsNotUploaded = Arrays.stream(FileSystemUtils.files(location)) + Arrays.stream(FileSystemUtils.files(location)) .map(filePath -> filePath.getFileName().toString()) .filter(filename -> filename.endsWith(TRANSLOG_FILE_SUFFIX)) - .map(filename -> { - long generation = Long.parseLong( + .map( + filename -> Long.parseLong( filename.substring(TRANSLOG_FILE_PREFIX.length(), filename.length() - TRANSLOG_FILE_SUFFIX.length()) - ); - return generation > uploadedGeneration ? generation : -1; - }) - .filter(generation -> generation > -1) - .collect(Collectors.toList()); - for (Long generation : generationsNotUploaded) { - Path checkpointFileName = location.resolve(Translog.getCommitCheckpointFileName(generation)); - Path translogFileName = location.resolve(Translog.getFilename(generation)); - Files.deleteIfExists(checkpointFileName); - Files.deleteIfExists(translogFileName); - } + ) + ) + .filter(generation -> generation > uploadedGeneration) + .forEach( + generation -> IOUtils.deleteFilesIgnoringExceptions( + location.resolve(Translog.getCommitCheckpointFileName(generation)), + location.resolve(Translog.getFilename(generation)) + ) + ); } public static TranslogTransferManager buildTranslogTransferManager( diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index a6b8a1a9e3175..c64f46a59f41b 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -133,7 +133,11 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans } } - public boolean downloadTranslog(String primaryTerm, String generation, Path location, boolean override) throws IOException { + public boolean downloadTranslog(String primaryTerm, String generation, Path location) throws IOException { + return downloadTranslog(primaryTerm, generation, location, false); + } + + boolean downloadTranslog(String primaryTerm, String generation, Path location, boolean override) throws IOException { logger.info( "Downloading translog files with: Primary Term = {}, Generation = {}, Location = {}", primaryTerm, From af6974973ed9652bd3e844bca354cf3bfceb7d47 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 27 Mar 2023 13:36:54 +0530 Subject: [PATCH 7/9] Incorporate PR comments Signed-off-by: Sachin Kale --- .../opensearch/remotestore/RemoteStoreIT.java | 4 ---- .../opensearch/index/shard/IndexShard.java | 11 +---------- .../index/translog/RemoteFsTranslog.java | 19 ++++++++++++++++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 86e4e50a08a38..1644d1c3e63ba 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -282,22 +282,18 @@ public void testPeerRecoveryWithRemoteStoreNoRemoteTranslogRefresh() throws Exce testPeerRecovery(false, randomIntBetween(2, 5), false); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataFlush() throws Exception { testPeerRecovery(true, 1, true); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogFlush() throws Exception { testPeerRecovery(true, randomIntBetween(2, 5), true); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() throws Exception { testPeerRecovery(true, 1, false); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/6193") public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { testPeerRecovery(true, randomIntBetween(2, 5), false); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 57dde2c1fb871..5020b23fff792 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4382,16 +4382,7 @@ public void syncTranslogFilesFromRemoteTranslog() throws IOException { TranslogFactory translogFactory = translogFactorySupplier.apply(indexSettings, shardRouting); assert translogFactory instanceof RemoteBlobStoreInternalTranslogFactory; Repository repository = ((RemoteBlobStoreInternalTranslogFactory) translogFactory).getRepository(); - assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; - BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; - FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); - TranslogTransferManager translogTransferManager = RemoteFsTranslog.buildTranslogTransferManager( - blobStoreRepository, - getThreadPool(), - shardId, - fileTransferTracker - ); - RemoteFsTranslog.download(translogTransferManager, shardPath().resolveTranslog()); + RemoteFsTranslog.download(repository, shardId, getThreadPool(), shardPath().resolveTranslog()); } /** diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 4c0cfa763f64a..8222018dd7722 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -22,6 +22,7 @@ import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; +import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -47,7 +48,6 @@ */ public class RemoteFsTranslog extends Translog { - private final BlobStoreRepository blobStoreRepository; private final TranslogTransferManager translogTransferManager; private final FileTransferTracker fileTransferTracker; private final BooleanSupplier primaryModeSupplier; @@ -78,7 +78,6 @@ public RemoteFsTranslog( BooleanSupplier primaryModeSupplier ) throws IOException { super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer); - this.blobStoreRepository = blobStoreRepository; this.primaryModeSupplier = primaryModeSupplier; fileTransferTracker = new FileTransferTracker(shardId); this.translogTransferManager = buildTranslogTransferManager(blobStoreRepository, threadPool, shardId, fileTransferTracker); @@ -117,7 +116,21 @@ public RemoteFsTranslog( } } - public static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { + public static void download(Repository repository, ShardId shardId, ThreadPool threadPool, Path location) throws IOException { + assert repository instanceof BlobStoreRepository : "repository should be instance of BlobStoreRepository"; + BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository; + FileTransferTracker fileTransferTracker = new FileTransferTracker(shardId); + TranslogTransferManager translogTransferManager = buildTranslogTransferManager( + blobStoreRepository, + threadPool, + shardId, + fileTransferTracker + ); + RemoteFsTranslog.download(translogTransferManager, location); + } + + + private static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { if (Files.notExists(location)) { From 0fa7a9cff5be024dcbb53fecd1b031f1277841a3 Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Mon, 27 Mar 2023 16:48:25 +0530 Subject: [PATCH 8/9] Fix spotless error Signed-off-by: Sachin Kale --- .../src/main/java/org/opensearch/index/shard/IndexShard.java | 3 --- .../java/org/opensearch/index/translog/RemoteFsTranslog.java | 1 - 2 files changed, 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 5020b23fff792..0f65379ba1f3d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -165,8 +165,6 @@ import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogRecoveryRunner; import org.opensearch.index.translog.TranslogStats; -import org.opensearch.index.translog.transfer.FileTransferTracker; -import org.opensearch.index.translog.transfer.TranslogTransferManager; import org.opensearch.index.warmer.ShardIndexWarmerService; import org.opensearch.index.warmer.WarmerStats; import org.opensearch.indices.IndexingMemoryController; @@ -182,7 +180,6 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; -import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.rest.RestStatus; import org.opensearch.search.suggest.completion.CompletionStats; import org.opensearch.threadpool.ThreadPool; diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 8222018dd7722..3e759613dc030 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -129,7 +129,6 @@ public static void download(Repository repository, ShardId shardId, ThreadPool t RemoteFsTranslog.download(translogTransferManager, location); } - private static void download(TranslogTransferManager translogTransferManager, Path location) throws IOException { TranslogTransferMetadata translogMetadata = translogTransferManager.readMetadata(); if (translogMetadata != null) { From 66fbbcc82136852a7696a6b4732b94865259254c Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Thu, 30 Mar 2023 19:18:29 +0530 Subject: [PATCH 9/9] Address PR comments Signed-off-by: Sachin Kale --- .../java/org/opensearch/index/engine/ReadOnlyEngine.java | 9 +++------ .../main/java/org/opensearch/index/shard/IndexShard.java | 5 +++-- .../org/opensearch/index/engine/ReadOnlyEngineTests.java | 3 +-- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index c5fcd5a4bc976..e73e748d7cd65 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -137,9 +137,7 @@ public ReadOnlyEngine( } if (seqNoStats == null) { seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos); - if (config.getIndexSettings().isRemoteTranslogStoreEnabled() == false) { - ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); - } + ensureMaxSeqNoEqualsToGlobalCheckpoint(seqNoStats); } this.seqNoStats = seqNoStats; this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory); @@ -179,7 +177,7 @@ public Translog.Operation next() { } protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStats) { - if (requireCompleteHistory == false) { + if (requireCompleteHistory == false || config().getIndexSettings().isRemoteTranslogStoreEnabled()) { return; } // Before 3.0 the global checkpoint is not known and up to date when the engine is created after @@ -200,9 +198,8 @@ protected void ensureMaxSeqNoEqualsToGlobalCheckpoint(final SeqNoStats seqNoStat } } - protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { assert maxSeqNo == globalCheckpoint : "max seq. no. [" + maxSeqNo + "] does not match [" + globalCheckpoint + "]"; - return true; } @Override diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0f65379ba1f3d..8bd6bfb795082 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3084,10 +3084,11 @@ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final S * calculations of the global checkpoint. However, we can not assert that we are in the translog stage of recovery here as * while the global checkpoint update may have emanated from the primary when we were in that state, we could subsequently move * to recovery finalization, or even finished recovery before the update arrives here. + * When remote translog is enabled for an index, replication operation is limited to primary term validation and does not + * update local checkpoint at replica, so the local checkpoint at replica can be less than globalCheckpoint. */ assert (state() != IndexShardState.POST_RECOVERY && state() != IndexShardState.STARTED) - || (indexSettings.isRemoteTranslogStoreEnabled() == true && state() != IndexShardState.RECOVERING) - : "supposedly in-sync shard copy received a global checkpoint [" + || indexSettings.isRemoteTranslogStoreEnabled() : "supposedly in-sync shard copy received a global checkpoint [" + globalCheckpoint + "] " + "that is higher than its local checkpoint [" diff --git a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java index 2ce7c62cbdfbd..1a718acf1c148 100644 --- a/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/ReadOnlyEngineTests.java @@ -202,9 +202,8 @@ public void testEnsureMaxSeqNoIsEqualToGlobalCheckpoint() throws IOException { IllegalStateException.class, () -> new ReadOnlyEngine(config, null, null, true, Function.identity(), true) { @Override - protected boolean assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { + protected void assertMaxSeqNoEqualsToGlobalCheckpoint(final long maxSeqNo, final long globalCheckpoint) { // we don't want the assertion to trip in this test - return true; } } );