Skip to content

Commit

Permalink
Add configuration for custom block sized index inputs
Browse files Browse the repository at this point in the history
Signed-off-by: Kunal Kotwani <[email protected]>
  • Loading branch information
kotwanikunal committed Mar 18, 2023
1 parent 5f81930 commit 3b94229
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Return success on DeletePits when no PITs exist. ([#6544](https://github.com/opensearch-project/OpenSearch/pull/6544))
- Add node repurpose command for search nodes ([#6517](https://github.com/opensearch-project/OpenSearch/pull/6517))
- [Segment Replication] Apply backpressure when replicas fall behind ([#6563](https://github.com/opensearch-project/OpenSearch/pull/6563))
- Add index input block size configuration for searchable snapshots ([#743](https://github.com/opensearch-project/OpenSearch/pull/6743))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down Expand Up @@ -110,4 +111,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.5...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.Index;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.monitor.fs.FsInfo;
import org.opensearch.repositories.fs.FsRepository;
Expand All @@ -58,6 +61,19 @@
@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
public final class SearchableSnapshotIT extends AbstractSnapshotIntegTestCase {

private ByteSizeValue blockSizeValue = new ByteSizeValue(
OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE,
ByteSizeUnit.BYTES
);

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(RemoteSnapshotDirectoryFactory.SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.getKey(), blockSizeValue)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
Expand Down Expand Up @@ -453,6 +469,51 @@ public void testFileCacheRestore() throws Exception {
}
}

public void testBlockSizeConfiguration() throws Exception {
final String snapshotName = "test-snap";
final String repoName = "test-repo";
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final int numReplicasIndex = randomIntBetween(1, 4);
final int numberOfDocs = 1000;
final Client client = client();

internalCluster().ensureAtLeastNumDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, numberOfDocs, indexName);
createRepositoryWithSettings(null, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);
deleteIndicesAndEnsureGreen(client, indexName);

// Test with 2 KB blocks
blockSizeValue = new ByteSizeValue(2, ByteSizeUnit.KB);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, numberOfDocs);
deleteIndicesAndEnsureGreen(client, restoredIndexName);

for (int i = 0; i < numReplicasIndex + 1; i++) {
internalCluster().stopRandomSearchNode();
}

// Test with 4 KB blocks
blockSizeValue = new ByteSizeValue(4, ByteSizeUnit.KB);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, numberOfDocs);
deleteIndicesAndEnsureGreen(client, restoredIndexName);

for (int i = 0; i < numReplicasIndex + 1; i++) {
internalCluster().stopRandomSearchNode();
}

// Test with 8 KB blocks
blockSizeValue = new ByteSizeValue(8, ByteSizeUnit.KB);
internalCluster().ensureAtLeastNumSearchNodes(numReplicasIndex + 1);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertDocCount(restoredIndexName, numberOfDocs);
deleteIndicesAndEnsureGreen(client, restoredIndexName);
}

/**
* Picks a shard out of the cluster state for each given index and asserts
* that the 'index' directory does not exist in the node's file system.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand Down Expand Up @@ -644,7 +645,7 @@ public void apply(Settings value, Settings current, Settings previous) {
*/
public static final Map<String, List<Setting>> FEATURE_FLAGGED_CLUSTER_SETTINGS = Map.of(
FeatureFlags.SEARCHABLE_SNAPSHOT,
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING)
List.of(Node.NODE_SEARCH_CACHE_SIZE_SETTING, RemoteSnapshotDirectoryFactory.SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING)
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.utils.TransferManager;
Expand All @@ -45,13 +46,20 @@ public final class RemoteSnapshotDirectory extends Directory {
private final Map<String, BlobStoreIndexShardSnapshot.FileInfo> fileInfoMap;
private final FSDirectory localStoreDir;
private final TransferManager transferManager;

public RemoteSnapshotDirectory(BlobStoreIndexShardSnapshot snapshot, FSDirectory localStoreDir, TransferManager transferManager) {
private final ByteSizeValue blockSize;

public RemoteSnapshotDirectory(
BlobStoreIndexShardSnapshot snapshot,
FSDirectory localStoreDir,
TransferManager transferManager,
ByteSizeValue blockSize
) {
this.fileInfoMap = snapshot.indexFiles()
.stream()
.collect(Collectors.toMap(BlobStoreIndexShardSnapshot.FileInfo::physicalName, f -> f));
this.localStoreDir = localStoreDir;
this.transferManager = transferManager;
this.blockSize = blockSize;
}

@Override
Expand All @@ -74,7 +82,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException {
if (fileInfo.name().startsWith(VIRTUAL_FILE_PREFIX)) {
return new ByteArrayIndexInput(fileInfo.physicalName(), fileInfo.metadata().hash().bytes);
}
return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager);
return new OnDemandBlockSnapshotIndexInput(fileInfo, localStoreDir, transferManager, blockSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@
import org.apache.lucene.store.FSDirectory;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.opensearch.index.store.remote.file.OnDemandBlockSnapshotIndexInput;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.utils.TransferManager;
import org.opensearch.plugins.IndexStorePlugin;
Expand All @@ -38,6 +43,12 @@
* @opensearch.internal
*/
public final class RemoteSnapshotDirectoryFactory implements IndexStorePlugin.DirectoryFactory {
public static final Setting<ByteSizeValue> SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING = Setting.byteSizeSetting(
"node.searchable_snapshot.block.size",
new ByteSizeValue(OnDemandBlockSnapshotIndexInput.Builder.DEFAULT_BLOCK_SIZE, ByteSizeUnit.BYTES),
Setting.Property.NodeScope
);

public static final String LOCAL_STORE_LOCATION = "RemoteLocalStore";

private final Supplier<RepositoriesService> repositoriesService;
Expand Down Expand Up @@ -73,6 +84,13 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
ShardPath localShardPath,
BlobStoreRepository blobStoreRepository
) throws IOException {
ByteSizeValue blockSize = SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.get(indexSettings.getSettings());
long blockSizeBytes = blockSize.getBytes();
if ((blockSizeBytes & (blockSizeBytes - 1)) != 0) {
throw new SettingsException(
"Invalid configuration for " + SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.getKey() + " - value must be a power of 2"
);
}
final BlobPath blobPath = blobStoreRepository.basePath()
.add("indices")
.add(IndexSettings.SEARCHABLE_SNAPSHOT_INDEX_ID.get(indexSettings.getSettings()))
Expand All @@ -91,7 +109,7 @@ private Future<RemoteSnapshotDirectory> createRemoteSnapshotDirectoryFromSnapsho
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(blobPath);
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
TransferManager transferManager = new TransferManager(blobContainer, remoteStoreFileCache);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager);
return new RemoteSnapshotDirectory(snapshot, localStoreDir, transferManager, blockSize);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,15 @@ Builder blockSizeShift(int blockSizeShift) {
this.blockMask = blockSize - 1;
return this;
}

Builder blockSize(long blockSize) {
assert blockSize < 1L << 31 && blockSize > 0 : "invalid block size";
assert ((blockSize & (blockSize - 1)) == 0) : "block size must be a power of 2";
this.blockSize = (int) blockSize;
this.blockMask = this.blockSize - 1;
this.blockSizeShift = (int) (Math.log(this.blockSize) / Math.log(2));
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import org.opensearch.index.store.remote.utils.BlobFetchRequest;
import org.opensearch.index.store.remote.utils.TransferManager;
Expand Down Expand Up @@ -57,7 +58,12 @@ public class OnDemandBlockSnapshotIndexInput extends OnDemandBlockIndexInput {
*/
protected final long originalFileSize;

public OnDemandBlockSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory, TransferManager transferManager) {
public OnDemandBlockSnapshotIndexInput(
FileInfo fileInfo,
FSDirectory directory,
TransferManager transferManager,
ByteSizeValue blockSize
) {
this(
"BlockedSnapshotIndexInput(path=\""
+ directory.getDirectory().toString()
Expand All @@ -74,7 +80,8 @@ public OnDemandBlockSnapshotIndexInput(FileInfo fileInfo, FSDirectory directory,
fileInfo.length(),
false,
directory,
transferManager
transferManager,
blockSize
);
}

Expand All @@ -85,10 +92,16 @@ public OnDemandBlockSnapshotIndexInput(
long length,
boolean isClone,
FSDirectory directory,
TransferManager transferManager
TransferManager transferManager,
ByteSizeValue blockSize
) {
this(
OnDemandBlockIndexInput.builder().resourceDescription(resourceDescription).isClone(isClone).offset(offset).length(length),
OnDemandBlockIndexInput.builder()
.blockSize(blockSize.getBytes())
.resourceDescription(resourceDescription)
.isClone(isClone)
.offset(offset)
.length(length),
fileInfo,
directory,
transferManager
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store.remote;

import org.junit.After;
import org.junit.Before;
import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsException;
import org.opensearch.common.unit.ByteSizeUnit;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.ShardId;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.index.store.remote.filecache.FileCacheFactory;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

import static org.apache.lucene.tests.util.LuceneTestCase.createTempDir;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.opensearch.index.IndexSettings.SEARCHABLE_SNAPSHOT_REPOSITORY;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory.SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING;

public class RemoteSnapshotDirectoryFactoryTests extends OpenSearchTestCase {

private Supplier<RepositoriesService> repositoriesServiceSupplier;
private ThreadPool threadPool;
private RepositoriesService repositoriesService;
private FileCache remoteStoreFileCache;
private Path path;

private static final String REPO_NAME = "test_repo";
private static final String INDEX_NAME = "test_index";
private final static int GIGA_BYTES = 1024 * 1024 * 1024;
private final static int SHARD_ID = 0;

@Before
public void setup() throws IOException {
repositoriesServiceSupplier = mock(Supplier.class);
threadPool = new TestThreadPool("test");
repositoriesService = mock(RepositoriesService.class);
when(repositoriesServiceSupplier.get()).thenReturn(repositoriesService);
remoteStoreFileCache = FileCacheFactory.createConcurrentLRUFileCache(GIGA_BYTES, 1);
path = createTempDir("RemoteSnapshotDirectoryFactoryTests");
}

@After
public void teardown() {
threadPool.shutdown();
}

public void testNewDirectoryBlockException() {
RemoteSnapshotDirectoryFactory remoteSnapshotDirectoryFactory = new RemoteSnapshotDirectoryFactory(
repositoriesServiceSupplier,
threadPool,
remoteStoreFileCache
);
Settings repositorySettings = Settings.builder()
.put(SEARCHABLE_SNAPSHOT_REPOSITORY.getKey(), REPO_NAME)
.put(SEARACHBLE_SNAPSHOT_BLOCK_SIZE_SETTING.getKey(), new ByteSizeValue(3, ByteSizeUnit.MB))
.build();
BlobStoreRepository blobStoreRepository = mock(BlobStoreRepository.class);
when(repositoriesService.repository(eq(REPO_NAME))).thenReturn(blobStoreRepository);

IndexSettings settings = newIndexSettings(newIndexMeta(INDEX_NAME, repositorySettings), Settings.EMPTY);
ShardPath shardPath = new ShardPath(
false,
path.resolve(INDEX_NAME).resolve(String.valueOf(SHARD_ID)),
path.resolve(INDEX_NAME).resolve(String.valueOf(SHARD_ID)),
new ShardId(INDEX_NAME, INDEX_NAME, SHARD_ID)
);
assertThrows(SettingsException.class, () -> remoteSnapshotDirectoryFactory.newDirectory(settings, shardPath));
}

private IndexSettings newIndexSettings(IndexMetadata metadata, Settings nodeSettings, Setting<?>... settings) {
Set<Setting<?>> settingSet = new HashSet<>(IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
if (settings.length > 0) {
settingSet.addAll(Arrays.asList(settings));
}
return new IndexSettings(metadata, nodeSettings, new IndexScopedSettings(Settings.EMPTY, settingSet));
}

private IndexMetadata newIndexMeta(String name, Settings indexSettings) {
Settings build = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(indexSettings)
.build();
return IndexMetadata.builder(name).settings(build).build();
}

}

0 comments on commit 3b94229

Please sign in to comment.