Skip to content

Commit

Permalink
Add UTs and ITs, fix existing UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Nov 25, 2023
1 parent d5ac8ac commit fb10d90
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.opensearch.test.transport.MockTransportService;
import org.hamcrest.MatcherAssert;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -42,6 +44,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
Expand Down Expand Up @@ -211,6 +214,44 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}

public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.getKey(), "3");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(4)));
}

public void testStaleCommitDeletionWithMinSegmentFiles_0() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RecoverySettings.CLUSTER_REMOTE_INDEX_MIN_SEGMENT_METADATA_FILES_SETTING.getKey(), "0");
internalCluster().startNode(settings);

createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/metadata");
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations + 1)));
}

/**
* Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster
* default.
Expand Down Expand Up @@ -534,4 +575,50 @@ public void testNoSearchIdleForAnyReplicaCount() throws ExecutionException, Inte
indexShard = getIndexShard(replicaShardNode);
assertFalse(indexShard.isSearchIdleSupported());
}

public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
internalCluster().startClusterManagerOnlyNode();
List<String> dataNodes = internalCluster().startDataOnlyNodes(2);

// 1. Create index with 0 replica
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

// 2. Index docs
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

// 3. Delete data from remote segment store
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path segmentDataPath = Path.of(String.valueOf(segmentRepoPath), indexUUID, "/0/segments/data");

try (Stream<Path> files = Files.list(segmentDataPath)) {
files.forEach(p -> {
try {
Files.delete(p);
} catch (IOException e) {
// Ignore
}
});
}

// 4. Start recovery by changing number of replicas to 1
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);

// 5. Ensure green and verify number of docs
ensureGreen(INDEX_NAME);
assertBusy(() -> {
assertHitCount(client(dataNodes.get(0)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
assertHitCount(client(dataNodes.get(1)).prepareSearch(INDEX_NAME).setSize(0).get(), 50);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -550,6 +551,9 @@ private Tuple<RemoteStoreRefreshListener, RemoteStoreStatsTrackerFactory> mockIn
RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory = indexShard.getRemoteStoreStatsTrackerFactory();
when(shard.indexSettings()).thenReturn(indexShard.indexSettings());
when(shard.shardId()).thenReturn(indexShard.shardId());
RecoverySettings recoverySettings = mock(RecoverySettings.class);
when(recoverySettings.getMinRemoteSegmentMetadataFiles()).thenReturn(10);
when(shard.getRecoverySettings()).thenReturn(recoverySettings);
RemoteSegmentTransferTracker tracker = remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(indexShard.shardId());
RemoteStoreRefreshListener refreshListener = new RemoteStoreRefreshListener(shard, emptyCheckpointPublisher, tracker);
refreshListener.afterRefresh(true);
Expand Down

0 comments on commit fb10d90

Please sign in to comment.