From fb10d90e43e390cdabf9f82b17a90628e760c1da Mon Sep 17 00:00:00 2001 From: Sachin Kale Date: Sat, 25 Nov 2023 18:37:03 +0530 Subject: [PATCH] Add UTs and ITs, fix existing UTs Signed-off-by: Sachin Kale --- .../opensearch/remotestore/RemoteStoreIT.java | 87 +++++++++++++++++++ .../RemoteStoreRefreshListenerTests.java | 4 + 2 files changed, 91 insertions(+) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index f26f0f3e002df..b9ead1eca6871 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -34,6 +34,8 @@ import org.opensearch.test.transport.MockTransportService; import org.hamcrest.MatcherAssert; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; @@ -42,6 +44,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -211,6 +214,44 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } + public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception { + Settings.Builder settings = Settings.builder() + .put(RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.getKey(), "3"); + internalCluster().startNode(settings); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(5, 15); + indexData(numberOfIterations, true, INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); + int actualFileCount = getFileCount(indexPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(4))); + } + + public void testStaleCommitDeletionWithMinSegmentFiles_0() throws Exception { + Settings.Builder settings = Settings.builder() + .put(RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.getKey(), "0"); + internalCluster().startNode(settings); + + createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1)); + int numberOfIterations = randomIntBetween(12, 18); + indexData(numberOfIterations, true, INDEX_NAME); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata"); + int actualFileCount = getFileCount(indexPath); + // We also allow (numberOfIterations + 1) as index creation also triggers refresh. + MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1))); + } + /** * Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster * default. @@ -534,4 +575,50 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte indexShard = getIndexShard(replicaShardNode); assertFalse(indexShard.isSearchIdleSupported()); } + + public void testFallbackToNodeToNodeSegmentCopy() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + + // 1. Create index with 0 replica + createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1)); + ensureGreen(INDEX_NAME); + + // 2. Index docs + indexBulk(INDEX_NAME, 50); + flushAndRefresh(INDEX_NAME); + + // 3. Delete data from remote segment store + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data"); + + try (Stream files = Files.list(segmentDataPath)) { + files.forEach(p -> { + try { + Files.delete(p); + } catch (IOException e) { + // Ignore + } + }); + } + + // 4. Start recovery by changing number of replicas to 1 + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(INDEX_NAME) + .setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)) + ); + + // 5. Ensure green and verify number of docs + ensureGreen(INDEX_NAME); + assertBusy(() -> { + assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50); + }); + } } diff --git a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java index 1421f26b89dd9..74da9a3fff19c 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoteStoreRefreshListenerTests.java @@ -33,6 +33,7 @@ import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils; import org.opensearch.index.store.Store; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; +import org.opensearch.indices.recovery.RecoverySettings; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; @@ -550,6 +551,9 @@ private Tuple mockIn RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = indexShard.getRemoteStoreStatsTrackerFactory(); when(shard.indexSettings()).thenReturn(indexShard.indexSettings()); when(shard.shardId()).thenReturn(indexShard.shardId()); + RecoverySettings recoverySettings = mock(RecoverySettings.class); + when(recoverySettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10); + when(shard.getRecoverySettings()).thenReturn(recoverySettings); RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId()); RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker); refreshListener.afterRefresh(true);